This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new a2d3d43a [YUNIKORN-2832] [core] Add non-Yunikorn allocation tracking
logic (#975)
a2d3d43a is described below
commit a2d3d43a145d3f5860fd8649d036be91d96317f6
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Oct 2 11:57:36 2024 +0200
[YUNIKORN-2832] [core] Add non-Yunikorn allocation tracking logic (#975)
Closes: #975
Signed-off-by: Peter Bacsko <[email protected]>
---
go.mod | 2 +-
go.sum | 4 +-
pkg/scheduler/context.go | 2 +-
pkg/scheduler/context_test.go | 100 ++++++++++++++++++++-
pkg/scheduler/objects/allocation.go | 26 ++++++
pkg/scheduler/objects/allocation_test.go | 33 +++++++
pkg/scheduler/objects/node.go | 51 +++++++++--
pkg/scheduler/objects/node_test.go | 149 ++++++++++++++++++++++++++++++-
pkg/scheduler/objects/utilities_test.go | 35 +++++---
pkg/scheduler/partition.go | 77 +++++++++++++++-
pkg/scheduler/partition_test.go | 61 ++++++++++++-
pkg/scheduler/tests/operation_test.go | 108 ++++++++++++++++++++++
pkg/scheduler/utilities_test.go | 33 +++++++
pkg/webservice/dao/allocation_info.go | 9 ++
pkg/webservice/dao/node_info.go | 27 +++---
pkg/webservice/handlers.go | 48 +++++++---
pkg/webservice/handlers_test.go | 48 +++++++++-
17 files changed, 752 insertions(+), 61 deletions(-)
diff --git a/go.mod b/go.mod
index 16d56008..26e243e8 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.21
require (
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240827015655-68e8c6cca28a
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240924203603-aaf51c93d3a0
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
diff --git a/go.sum b/go.sum
index 61b3fa66..026aa4f7 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,5 @@
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240827015655-68e8c6cca28a
h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240827015655-68e8c6cca28a/go.mod
h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240924203603-aaf51c93d3a0
h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240924203603-aaf51c93d3a0/go.mod
h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index b0df469a..9a22b41e 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -759,7 +759,7 @@ func (cc *ClusterContext) processAllocations(request
*si.AllocationRequest) {
continue
}
// at some point, we may need to handle new requests as well
- if newAlloc {
+ if newAlloc && !alloc.IsForeign() {
cc.notifyRMNewAllocation(request.RmID, alloc)
}
}
diff --git a/pkg/scheduler/context_test.go b/pkg/scheduler/context_test.go
index 075ab563..ea9d3414 100644
--- a/pkg/scheduler/context_test.go
+++ b/pkg/scheduler/context_test.go
@@ -37,9 +37,10 @@ import (
const pName = "default"
type mockEventHandler struct {
- eventHandled bool
- rejectedNodes []*si.RejectedNode
- acceptedNodes []*si.AcceptedNode
+ eventHandled bool
+ rejectedNodes []*si.RejectedNode
+ acceptedNodes []*si.AcceptedNode
+ newAllocHandler func(*rmevent.RMNewAllocationsEvent)
}
func newMockEventHandler() *mockEventHandler {
@@ -56,6 +57,10 @@ func (m *mockEventHandler) HandleEvent(ev interface{}) {
m.rejectedNodes = append(m.rejectedNodes,
nodeEvent.RejectedNodes...)
m.acceptedNodes = append(m.acceptedNodes,
nodeEvent.AcceptedNodes...)
}
+
+ if allocEvent, ok := ev.(*rmevent.RMNewAllocationsEvent); ok &&
m.newAllocHandler != nil {
+ m.newAllocHandler(allocEvent)
+ }
}
func createTestContext(t *testing.T, partitionName string) *ClusterContext {
@@ -71,7 +76,13 @@ func createTestContext(t *testing.T, partitionName string)
*ClusterContext {
Name: "root",
Parent: true,
SubmitACL: "*",
- Queues: nil,
+ Queues: []configs.QueueConfig{
+ {
+ Name: "default",
+ Parent: false,
+ SubmitACL: "*",
+ },
+ },
},
},
}
@@ -296,6 +307,87 @@ func TestContextDrainingNodeBackToSchedulableMetrics(t
*testing.T) {
verifyMetrics(t, 0, "draining")
}
+func TestContext_OnAllocationNotification(t *testing.T) {
+ context := createTestContext(t, pName)
+ eventHandler := context.rmEventHandler.(*mockEventHandler)
//nolint:errcheck
+ var lastAllocEvent *rmevent.RMNewAllocationsEvent
+ eventHandler.newAllocHandler = func(event
*rmevent.RMNewAllocationsEvent) {
+ lastAllocEvent = event
+ go func() {
+ event.Channel <- &rmevent.Result{Succeeded: true}
+ }()
+ }
+
+ n := getNodeInfoForAddingNode()
+ err := context.addNode(n, true)
+ assert.NilError(t, err, "unexpected error returned from addNode")
+ partition := context.GetPartition(pName)
+ assert.Assert(t, partition != nil)
+ assert.Equal(t, 1, len(partition.GetNodes()), "expected node not found
on partition")
+
+ // register application
+ appReq := &si.ApplicationRequest{
+ New: []*si.AddApplicationRequest{
+ {
+ QueueName: defQueue,
+ PartitionName: pName,
+ Ugi: &si.UserGroupInformation{
+ User: "testuser",
+ Groups: []string{"testgroup"},
+ },
+ ApplicationID: appID1,
+ },
+ },
+ RmID: "rm:123",
+ }
+
context.handleRMUpdateApplicationEvent(&rmevent.RMUpdateApplicationEvent{Request:
appReq})
+
+ // add a Yunikorn allocation
+ allocReq := &si.AllocationRequest{
+ Allocations: []*si.Allocation{
+ {
+ AllocationKey: allocKey,
+ ResourcePerAlloc: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "first": {Value: 1},
+ },
+ },
+ ApplicationID: appID1,
+ NodeID: "test-1",
+ PartitionName: pName,
+ },
+ },
+ RmID: "rm:123",
+ }
+
context.handleRMUpdateAllocationEvent(&rmevent.RMUpdateAllocationEvent{Request:
allocReq})
+ assert.Assert(t, lastAllocEvent != nil)
+ assert.Equal(t, lastAllocEvent.Allocations[0].AllocationKey, allocKey)
+
+ // add a non-Yunikorn allocation
+ lastAllocEvent = nil
+ nonYkAllocReq := &si.AllocationRequest{
+ Allocations: []*si.Allocation{
+ {
+ AllocationKey: "foreign-alloc-1",
+ ResourcePerAlloc: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "first": {Value: 1},
+ },
+ },
+ AllocationTags: map[string]string{
+ siCommon.Foreign:
siCommon.AllocTypeDefault,
+ },
+ NodeID: "test-1",
+ PartitionName: pName,
+ },
+ },
+ RmID: "rm:123",
+ }
+
+
context.handleRMUpdateAllocationEvent(&rmevent.RMUpdateAllocationEvent{Request:
nonYkAllocReq})
+ assert.Assert(t, lastAllocEvent == nil, "unexpected allocation event")
+}
+
func getNodeInfoForAddingNode() *si.NodeInfo {
n := &si.NodeInfo{
NodeID: "test-1",
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index 8f2450e9..37d0bd61 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -49,6 +49,8 @@ type Allocation struct {
originator bool
tags map[string]string
resKeyWithoutNode string // the reservation key without node
+ foreign bool
+ preemptable bool
// Mutable fields which need protection
allocated bool
@@ -106,6 +108,20 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation
{
createTime = time.Unix(siCreationTime, 0)
}
+ foreign := false
+ preemptable := true
+ if foreignType, ok := alloc.AllocationTags[siCommon.Foreign]; ok {
+ foreign = true
+ switch foreignType {
+ case siCommon.AllocTypeStatic:
+ preemptable = false
+ case siCommon.AllocTypeDefault:
+ default:
+ log.Log(log.SchedAllocation).Warn("Foreign tag has
illegal value, using default",
+ zap.String("value", foreignType))
+ }
+ }
+
var allocated bool
var nodeID string
var bindTime time.Time
@@ -135,6 +151,8 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
allocated: allocated,
nodeID: nodeID,
bindTime: bindTime,
+ foreign: foreign,
+ preemptable: preemptable,
}
}
@@ -573,3 +591,11 @@ func (a *Allocation) setUserQuotaCheckPassed() {
a.askEvents.SendRequestFitsInUserQuota(a.allocationKey,
a.applicationID, a.allocatedResource)
}
}
+
+func (a *Allocation) IsForeign() bool {
+ return a.foreign
+}
+
+func (a *Allocation) IsPreemptable() bool {
+ return a.preemptable
+}
diff --git a/pkg/scheduler/objects/allocation_test.go
b/pkg/scheduler/objects/allocation_test.go
index 99a1593a..62bf316a 100644
--- a/pkg/scheduler/objects/allocation_test.go
+++ b/pkg/scheduler/objects/allocation_test.go
@@ -466,3 +466,36 @@ func TestNewAllocFromSI(t *testing.T) {
assert.Assert(t, !alloc.IsAllowPreemptSelf(), "alloc should not have
allow-preempt-self set")
assert.Assert(t, !alloc.IsAllowPreemptOther(), "alloc should not have
allow-preempt-other set")
}
+
+func TestNewForeignAllocFromSI(t *testing.T) {
+ res := resources.NewResourceFromMap(map[string]resources.Quantity{
+ "first": 1,
+ })
+ siAlloc := &si.Allocation{
+ AllocationKey: "foreign-1",
+ NodeID: "node-1",
+ ResourcePerAlloc: res.ToProto(),
+ TaskGroupName: "",
+ AllocationTags: map[string]string{
+ siCommon.Foreign: siCommon.AllocTypeDefault,
+ },
+ }
+
+ // default
+ alloc := NewAllocationFromSI(siAlloc)
+ assert.Assert(t, alloc.IsPreemptable())
+ assert.Assert(t, alloc.IsForeign())
+ assert.Equal(t, "foreign-1", alloc.GetAllocationKey())
+ assert.Equal(t, "node-1", alloc.GetNodeID())
+ assert.Assert(t, resources.Equals(res, alloc.GetAllocatedResource()))
+
+ // static
+ siAlloc.AllocationTags[siCommon.Foreign] = siCommon.AllocTypeStatic
+ alloc = NewAllocationFromSI(siAlloc)
+ assert.Assert(t, !alloc.IsPreemptable())
+
+ // illegal value for foreign type
+ siAlloc.AllocationTags[siCommon.Foreign] = "xyz"
+ alloc = NewAllocationFromSI(siAlloc)
+ assert.Assert(t, alloc.IsPreemptable())
+}
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index d4506320..a4907111 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -243,6 +243,31 @@ func (sn *Node) GetAllAllocations() []*Allocation {
return arr
}
+// GetYunikornAllocations returns a copy of Yunikorn allocations on this node
+func (sn *Node) GetYunikornAllocations() []*Allocation {
+ sn.RLock()
+ defer sn.RUnlock()
+ return sn.getAllocations(false)
+}
+
+// GetForeignAllocations returns a copy of non-Yunikorn allocations on this
node
+func (sn *Node) GetForeignAllocations() []*Allocation {
+ sn.RLock()
+ defer sn.RUnlock()
+ return sn.getAllocations(true)
+}
+
+func (sn *Node) getAllocations(foreign bool) []*Allocation {
+ arr := make([]*Allocation, 0)
+ for _, v := range sn.allocations {
+ if v.IsForeign() == foreign {
+ arr = append(arr, v)
+ }
+ }
+
+ return arr
+}
+
// Set the node to unschedulable.
// This will cause the node to be skipped during the scheduling cycle.
// Visible for testing only
@@ -312,15 +337,24 @@ func (sn *Node) FitInNode(resRequest *resources.Resource)
bool {
// is found the Allocation removed is returned. Used resources will decrease
available
// will increase as per the allocation removed.
func (sn *Node) RemoveAllocation(allocationKey string) *Allocation {
- defer sn.notifyListeners()
+ var alloc *Allocation
+ defer func() {
+ if alloc != nil && !alloc.IsForeign() {
+ sn.notifyListeners()
+ }
+ }()
sn.Lock()
defer sn.Unlock()
- alloc := sn.allocations[allocationKey]
+ alloc = sn.allocations[allocationKey]
if alloc != nil {
delete(sn.allocations, allocationKey)
- sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
- sn.allocatedResource.Prune()
+ if alloc.IsForeign() {
+ sn.occupiedResource =
resources.Sub(sn.occupiedResource, alloc.GetAllocatedResource())
+ } else {
+
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
+ sn.allocatedResource.Prune()
+ }
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID,
alloc.allocationKey, alloc.GetAllocatedResource())
return alloc
@@ -348,9 +382,10 @@ func (sn *Node) addAllocationInternal(alloc *Allocation,
force bool) bool {
return false
}
result := false
+ foreign := alloc.IsForeign()
defer func() {
// check result to ensure we don't notify listeners
unnecessarily
- if result {
+ if result && !foreign {
sn.notifyListeners()
}
}()
@@ -361,7 +396,11 @@ func (sn *Node) addAllocationInternal(alloc *Allocation,
force bool) bool {
res := alloc.GetAllocatedResource()
if force || sn.availableResource.FitIn(res) {
sn.allocations[alloc.GetAllocationKey()] = alloc
- sn.allocatedResource.AddTo(res)
+ if foreign {
+ sn.occupiedResource =
resources.Add(sn.occupiedResource, alloc.GetAllocatedResource())
+ } else {
+ sn.allocatedResource.AddTo(res)
+ }
sn.availableResource.SubFrom(res)
sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID,
alloc.allocationKey, res)
diff --git a/pkg/scheduler/objects/node_test.go
b/pkg/scheduler/objects/node_test.go
index c64fa76a..90a30e62 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -414,23 +414,54 @@ func TestAddAllocation(t *testing.T) {
assert.Assert(t, resources.IsZero(node.GetAllocatedResource()),
"removal of half allocation should return to zero %v, got %v", node,
node.GetAllocatedResource())
}
+func TestAddForeignAllocation(t *testing.T) {
+ node := newNode("node-123", map[string]resources.Quantity{"first": 100,
"second": 200})
+ if !resources.IsZero(node.GetAllocatedResource()) {
+ t.Fatal("Failed to initialize resource")
+ }
+
+ // check foreign alloc that is over quota
+ large :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 200})
+ falloc1 := newForeignAllocation("foreign-1", "node-123", large)
+ node.AddAllocation(falloc1)
+ assert.Assert(t, resources.Equals(large, node.GetOccupiedResource()),
"invalid occupied resource - expected %v got %v",
+ large, node.GetOccupiedResource())
+ assert.Assert(t, resources.Equals(resources.Zero,
node.GetAllocatedResource()), "allocated resource is not zero: %v",
node.GetAllocatedResource())
+ assert.Assert(t, resources.Equals(resources.Zero,
node.GetUtilizedResource()), "utilized resource is not zero: %v",
node.GetUtilizedResource())
+ node.RemoveAllocation("foreign-1")
+
+ // check foreign alloc that is below quota
+ half :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50,
"second": 100})
+ falloc2 := newForeignAllocation("foreign-2", "node-123", half)
+ node.AddAllocation(falloc2)
+ assert.Assert(t, resources.Equals(half, node.GetOccupiedResource()),
"invalid occupied resource - expected %v got %v",
+ large, node.GetOccupiedResource())
+ assert.Assert(t, resources.Equals(resources.Zero,
node.GetAllocatedResource()), "allocated resource is not zero: %v",
node.GetAllocatedResource())
+ assert.Assert(t, resources.Equals(resources.Zero,
node.GetUtilizedResource()), "utilized resource is not zero: %v",
node.GetUtilizedResource())
+}
+
func TestTryAddAllocation(t *testing.T) {
node := newNode("node-123", map[string]resources.Quantity{"first": 100,
"second": 200})
if !resources.IsZero(node.GetAllocatedResource()) {
t.Fatal("Failed to initialize resource")
}
+ listener := &testListener{}
+ node.AddListener(listener)
// check nil alloc
assert.Assert(t, !node.TryAddAllocation(nil), "nil allocation should
not have been added: %v", node)
+ assert.Equal(t, 0, listener.updateCount, "unexpected listener update")
// check alloc that does not match
unknown :=
resources.NewResourceFromMap(map[string]resources.Quantity{"unknown": 1})
alloc := newAllocation(appID1, nodeID1, unknown)
assert.Assert(t, !node.TryAddAllocation(alloc), "unmatched resource
type in allocation should not have been added: %v", node)
+ assert.Equal(t, 0, listener.updateCount, "unexpected listener update")
// allocate half of the resources available and check the calculation
half :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50,
"second": 100})
alloc = newAllocation(appID1, nodeID1, half)
assert.Assert(t, node.TryAddAllocation(alloc), "add allocation 1 should
not have failed")
+ assert.Equal(t, 1, listener.updateCount, "listener update expected")
if node.GetAllocation(alloc.GetAllocationKey()) == nil {
t.Fatal("failed to add allocations: allocation not returned")
}
@@ -448,6 +479,7 @@ func TestTryAddAllocation(t *testing.T) {
piece :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25,
"second": 50})
alloc = newAllocation(appID1, nodeID1, piece)
assert.Assert(t, node.TryAddAllocation(alloc), "add allocation 2 should
not have failed")
+ assert.Equal(t, 2, listener.updateCount, "listener update expected")
if node.GetAllocation(alloc.GetAllocationKey()) == nil {
t.Fatal("failed to add allocations: allocation not returned")
}
@@ -465,16 +497,59 @@ func TestTryAddAllocation(t *testing.T) {
}
}
+func TestTryAddForeignAllocation(t *testing.T) {
+ node := newNode("node-123", map[string]resources.Quantity{"first": 100,
"second": 200})
+ if !resources.IsZero(node.GetAllocatedResource()) {
+ t.Fatal("Failed to initialize resource")
+ }
+ listener := &testListener{}
+ node.AddListener(listener)
+
+ // first allocation - doesn't fit
+ fRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 200,
"second": 200})
+ fAlloc1 := newForeignAllocation("foreign-1", "node-123", fRes)
+ assert.Assert(t, !node.TryAddAllocation(fAlloc1), "add allocation
should have failed")
+ assert.Equal(t, 0, listener.updateCount, "unexpected listener update")
+ assert.Assert(t, resources.Equals(node.GetOccupiedResource(),
resources.Zero), "occupied resource is not zero: %v",
node.GetOccupiedResource())
+ assert.Assert(t, resources.Equals(node.GetAllocatedResource(),
resources.Zero), "allocated resource is not zero: %v",
node.GetAllocatedResource())
+ assert.Assert(t, resources.Equals(node.GetUtilizedResource(),
resources.Zero), "utilized resource is not zero: %v",
node.GetUtilizedResource())
+
+ // second allocation - fits
+ fRes =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 20,
"second": 20})
+ fAlloc2 := newForeignAllocation("foreign-2", "node-123", fRes)
+ assert.Assert(t, node.TryAddAllocation(fAlloc2), "add allocation should
have succeeded")
+ assert.Equal(t, 0, listener.updateCount, "unexpected listener update")
+ assert.Assert(t, node.GetAllocation("foreign-2") != nil, "failed to add
allocations: allocation foreign-2 not returned")
+ assert.Assert(t, resources.Equals(node.GetOccupiedResource(), fRes),
"occupied resource is not zero: %v", node.GetOccupiedResource())
+ assert.Assert(t, resources.Equals(node.GetAllocatedResource(),
resources.Zero), "allocated resource is not zero: %v",
node.GetAllocatedResource())
+ assert.Assert(t, resources.Equals(node.GetUtilizedResource(),
resources.Zero), "utilized resource is not zero: %v",
node.GetUtilizedResource())
+
+ // third allocation - fits
+ fRes =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25,
"second": 35})
+ fAlloc3 := newForeignAllocation("foreign-3", "node-123", fRes)
+ assert.Assert(t, node.TryAddAllocation(fAlloc3), "add allocation should
have succeeded")
+ assert.Equal(t, 0, listener.updateCount, "unexpected listener update")
+ assert.Assert(t, node.GetAllocation("foreign-3") != nil, "failed to add
allocations: allocation foreign-2 not returned")
+ expectedOccupied :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 45,
"second": 55})
+ assert.Assert(t, resources.Equals(node.GetOccupiedResource(),
expectedOccupied), "occupied resource calculation is invalid - expected %v got
%v",
+ expectedOccupied, node.GetOccupiedResource())
+ assert.Assert(t, resources.Equals(node.GetAllocatedResource(),
resources.Zero), "allocated resource is not zero: %v",
node.GetAllocatedResource())
+ assert.Assert(t, resources.Equals(node.GetUtilizedResource(),
resources.Zero), "utilized resource is not zero: %v",
node.GetUtilizedResource())
+}
+
func TestRemoveAllocation(t *testing.T) {
node := newNode("node-123", map[string]resources.Quantity{"first": 100,
"second": 200})
if !resources.IsZero(node.GetAllocatedResource()) {
t.Fatal("Failed to initialize resource")
}
+ listener := &testListener{}
+ node.AddListener(listener)
// allocate half of the resources available and check the calculation
half :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50,
"second": 100})
alloc1 := newAllocation(appID1, nodeID1, half)
node.AddAllocation(alloc1)
+ assert.Equal(t, 1, listener.updateCount, "listener update expected")
if node.GetAllocation(alloc1.GetAllocationKey()) == nil {
t.Fatal("failed to add allocations: allocation not returned")
}
@@ -488,6 +563,8 @@ func TestRemoveAllocation(t *testing.T) {
if !resources.Equals(node.GetAllocatedResource(), half) {
t.Errorf("allocated resource not set correctly %v got %v",
half, node.GetAllocatedResource())
}
+ assert.Equal(t, 1, listener.updateCount, "unexpected listener update
for removal")
+ listener.updateCount = 0
// add second alloc and remove first check calculation
piece :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 25,
"second": 50})
@@ -497,6 +574,8 @@ func TestRemoveAllocation(t *testing.T) {
t.Fatal("failed to add allocations: allocation not returned")
}
alloc := node.RemoveAllocation(alloc1.GetAllocationKey())
+ assert.Equal(t, 2, listener.updateCount, "update expected for
add+removal")
+ listener.updateCount = 0
if alloc == nil {
t.Error("allocation should have been removed but was not")
}
@@ -511,6 +590,36 @@ func TestRemoveAllocation(t *testing.T) {
if !resources.Equals(node.GetUtilizedResource(),
expectedUtilizedResource) {
t.Errorf("failed to get utilized resources expected %v, got
%v", expectedUtilizedResource, node.GetUtilizedResource())
}
+ _ = node.RemoveAllocation(alloc2.GetAllocationKey())
+ assert.Equal(t, 1, listener.updateCount, "update expected for removal")
+ listener.updateCount = 0
+}
+
+func TestRemoveForeignAllocation(t *testing.T) {
+ node := newNode("node-123", map[string]resources.Quantity{"first": 100,
"second": 200})
+ if !resources.IsZero(node.GetAllocatedResource()) {
+ t.Fatal("Failed to initialize resource")
+ }
+ listener := &testListener{}
+ node.AddListener(listener)
+
+ fRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11,
"second": 11})
+ fAlloc1 := newForeignAllocation("foreign-1", "node-123", fRes)
+ node.AddAllocation(fAlloc1)
+ assert.Assert(t, node.GetAllocation("foreign-1") != nil, "failed to add
allocations: allocation foreign-1 not returned")
+ fAlloc2 := newForeignAllocation("foreign-2", "node-123", fRes)
+ node.AddAllocation(fAlloc2)
+ assert.Assert(t, node.GetAllocation("foreign-2") != nil, "failed to add
allocations: allocation foreign-2 not returned")
+ node.RemoveAllocation("foreign-1")
+ assert.Equal(t, 0, listener.updateCount, "no update expected for
foreign allocation add+removal")
+ expectedOccupied :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 11,
"second": 11})
+ assert.Assert(t, resources.Equals(node.GetOccupiedResource(),
expectedOccupied), "occupied resource calculation is invalid - expected %v got
%v",
+ expectedOccupied, node.GetOccupiedResource())
+ expectedAvailable :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 89,
"second": 189})
+ assert.Assert(t, resources.Equals(node.GetAvailableResource(),
expectedAvailable), "available resource calculation is invalid - expected %v
got %v",
+ expectedAvailable, node.GetAvailableResource())
+ assert.Assert(t, resources.Equals(node.GetAllocatedResource(),
resources.Zero), "allocated resource calculation is invalid - expected zero got
%v",
+ node.GetAllocatedResource())
}
func TestNodeReplaceAllocation(t *testing.T) {
@@ -579,7 +688,7 @@ func TestGetAllocation(t *testing.T) {
}
}
-func TestGetAllocations(t *testing.T) {
+func TestGetAllAllocations(t *testing.T) {
node := newNode("node-123", map[string]resources.Quantity{"first": 100,
"second": 200})
if !resources.IsZero(node.GetAllocatedResource()) {
t.Fatal("Failed to initialize resource")
@@ -839,3 +948,41 @@ func TestPreconditions(t *testing.T) {
assert.NilError(t, err)
assert.Equal(t, 1, len(ask.allocLog))
}
+
+func TestGetAllocations(t *testing.T) {
+ node := newNode(nodeID1, map[string]resources.Quantity{"first": 100,
"second": 200})
+ allocRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10,
"second": 20})
+
+ // no allocations
+ assert.Equal(t, 0, len(node.GetForeignAllocations()))
+ assert.Equal(t, 0, len(node.GetYunikornAllocations()))
+
+ alloc1 := newAllocationWithKey(aKey, appID1, nodeID1, nil)
+ alloc2 := newAllocationWithKey(aKey2, appID1, nodeID1, nil)
+ alloc3 := newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
+ alloc4 := newForeignAllocation(foreignAlloc2, nodeID1, allocRes)
+ node.AddAllocation(alloc1)
+ node.AddAllocation(alloc2)
+ node.AddAllocation(alloc3)
+ node.AddAllocation(alloc4)
+
+ t.Run("GetYunikornAllocations", func(t *testing.T) {
+ allocs := node.GetYunikornAllocations()
+ assert.Equal(t, 2, len(allocs))
+ m := map[string]bool{}
+ m[allocs[0].allocationKey] = true
+ m[allocs[1].allocationKey] = true
+ assert.Assert(t, m[aKey])
+ assert.Assert(t, m[aKey2])
+ })
+
+ t.Run("GetForeignAllocations", func(t *testing.T) {
+ allocs := node.GetForeignAllocations()
+ assert.Equal(t, 2, len(allocs))
+ m := map[string]bool{}
+ m[allocs[0].allocationKey] = true
+ m[allocs[1].allocationKey] = true
+ assert.Assert(t, m[foreignAlloc1])
+ assert.Assert(t, m[foreignAlloc2])
+ })
+}
diff --git a/pkg/scheduler/objects/utilities_test.go
b/pkg/scheduler/objects/utilities_test.go
index d12d644e..1f81bddf 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -34,20 +34,23 @@ import (
"github.com/apache/yunikorn-core/pkg/rmproxy"
schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
const (
- appID0 = "app-0"
- appID1 = "app-1"
- appID2 = "app-2"
- appID3 = "app-3"
- aKey = "alloc-1"
- aKey2 = "alloc-2"
- nodeID1 = "node-1"
- nodeID2 = "node-2"
- instType1 = "itype-1"
- testgroup = "testgroup"
+ appID0 = "app-0"
+ appID1 = "app-1"
+ appID2 = "app-2"
+ appID3 = "app-3"
+ aKey = "alloc-1"
+ aKey2 = "alloc-2"
+ nodeID1 = "node-1"
+ nodeID2 = "node-2"
+ instType1 = "itype-1"
+ testgroup = "testgroup"
+ foreignAlloc1 = "foreign-1"
+ foreignAlloc2 = "foreign-2"
)
// Create the root queue, base for all testing
@@ -224,6 +227,18 @@ func newAllocationWithKey(allocKey, appID, nodeID string,
res *resources.Resourc
return newAllocationAll(allocKey, appID, nodeID, "", res, false, 0)
}
+// Create a new foreign Allocation with a specified allocation key
+func newForeignAllocation(allocKey, nodeID string, res *resources.Resource)
*Allocation {
+ return NewAllocationFromSI(&si.Allocation{
+ AllocationKey: allocKey,
+ AllocationTags: map[string]string{
+ siCommon.Foreign: siCommon.AllocTypeDefault,
+ },
+ ResourcePerAlloc: res.ToProto(),
+ NodeID: nodeID,
+ })
+}
+
// Create a new Allocation with a random ask key
func newPlaceholderAlloc(appID, nodeID string, res *resources.Resource,
taskGroup string) *Allocation {
allocKey := strconv.FormatInt((time.Now()).UnixNano(), 10)
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 1acd5af2..0db2dbda 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -65,6 +65,7 @@ type PartitionContext struct {
reservations int // number of
reservations
placeholderAllocations int // number of
placeholder allocations
preemptionEnabled bool // whether
preemption is enabled or not
+ foreignAllocs map[string]string //
allocKey-nodeID assignment of non-Yunikorn allocations
// The partition write lock must not be held while manipulating an
application.
// Scheduling is running continuously as a lock free background task.
Scheduling an application
@@ -94,6 +95,7 @@ func newPartitionContext(conf configs.PartitionConfig, rmID
string, cc *ClusterC
applications: make(map[string]*objects.Application),
completedApplications: make(map[string]*objects.Application),
nodes: objects.NewNodeCollection(conf.Name),
+ foreignAllocs: make(map[string]string),
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf); err != nil {
@@ -1131,12 +1133,17 @@ func (pc *PartitionContext) UpdateAllocation(alloc
*objects.Allocation) (request
allocationKey := alloc.GetAllocationKey()
applicationID := alloc.GetApplicationID()
nodeID := alloc.GetNodeID()
+ node := pc.GetNode(alloc.GetNodeID())
log.Log(log.SchedPartition).Info("processing allocation",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
+ if alloc.IsForeign() {
+ return pc.handleForeignAllocation(allocationKey, applicationID,
nodeID, node, alloc)
+ }
+
// find application
app := pc.getApplication(alloc.GetApplicationID())
if app == nil {
@@ -1146,10 +1153,8 @@ func (pc *PartitionContext) UpdateAllocation(alloc
*objects.Allocation) (request
queue := app.GetQueue()
// find node if one is specified
- var node *objects.Node = nil
allocated := alloc.IsAllocated()
if allocated {
- node = pc.GetNode(alloc.GetNodeID())
if node == nil {
metrics.GetSchedulerMetrics().IncSchedulingError()
return false, false, fmt.Errorf("failed to find node
%s", nodeID)
@@ -1286,12 +1291,54 @@ func (pc *PartitionContext) UpdateAllocation(alloc
*objects.Allocation) (request
return false, false, nil
}
+func (pc *PartitionContext) handleForeignAllocation(allocationKey,
applicationID, nodeID string, node *objects.Node, alloc *objects.Allocation)
(requestCreated bool, allocCreated bool, err error) {
+ allocated := alloc.IsAllocated()
+ if !allocated {
+ return false, false, fmt.Errorf("trying to add a foreign
request (non-allocation) %s", allocationKey)
+ }
+ if alloc.GetNodeID() == "" {
+ return false, false, fmt.Errorf("node ID is empty for
allocation %s", allocationKey)
+ }
+ if node == nil {
+ return false, false, fmt.Errorf("failed to find node %s for
allocation %s", nodeID, allocationKey)
+ }
+
+ existingNodeID := pc.getOrSetNodeIDForAlloc(allocationKey, nodeID)
+ if existingNodeID == "" {
+ log.Log(log.SchedPartition).Info("handling new foreign
allocation",
+ zap.String("partitionName", pc.Name),
+ zap.String("nodeID", nodeID),
+ zap.String("allocationKey", allocationKey))
+ node.AddAllocation(alloc)
+ return false, true, nil
+ }
+
+ log.Log(log.SchedPartition).Info("handling foreign allocation update",
+ zap.String("partitionName", pc.Name),
+ zap.String("appID", applicationID),
+ zap.String("allocationKey", allocationKey))
+ // this is a placeholder for eventual resource updates; nothing to do
yet
+ return false, false, nil
+}
+
func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced
bool) (security.UserGroup, error) {
pc.RLock()
defer pc.RUnlock()
return pc.userGroupCache.ConvertUGI(ugi, forced)
}
+// getOrSetNodeIDForAlloc returns the nodeID for a given foreign allocation,
or sets is if it's unset
+func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string)
string {
+ pc.Lock()
+ defer pc.Unlock()
+ id := pc.foreignAllocs[allocKey]
+ if id != "" {
+ return id
+ }
+ pc.foreignAllocs[allocKey] = nodeID
+ return ""
+}
+
// calculate overall nodes resource usage and returns a map as the result,
// where the key is the resource name, e.g memory, and the value is a []int,
// which is a slice with 10 elements,
@@ -1373,6 +1420,10 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
}
appID := release.ApplicationID
allocationKey := release.GetAllocationKey()
+ if appID == "" {
+ pc.removeForeignAllocation(allocationKey)
+ return nil, nil
+ }
app := pc.getApplication(appID)
// no app nothing to do everything should already be clean
if app == nil {
@@ -1496,6 +1547,28 @@ func (pc *PartitionContext) removeAllocation(release
*si.AllocationRelease) ([]*
return released, confirmed
}
+func (pc *PartitionContext) removeForeignAllocation(allocID string) {
+ nodeID := pc.foreignAllocs[allocID]
+ if nodeID == "" {
+ log.Log(log.SchedPartition).Debug("Tried to remove a
non-existing foreign allocation",
+ zap.String("allocationID", allocID),
+ zap.String("nodeID", nodeID))
+ return
+ }
+ delete(pc.foreignAllocs, allocID)
+ node := pc.GetNode(nodeID)
+ if node == nil {
+ log.Log(log.SchedPartition).Debug("Node not found for foreign
allocation",
+ zap.String("allocationID", allocID),
+ zap.String("nodeID", nodeID))
+ return
+ }
+ log.Log(log.SchedPartition).Info("Removing foreign allocation",
+ zap.String("allocationID", allocID),
+ zap.String("nodeID", nodeID))
+ node.RemoveAllocation(allocID)
+}
+
func (pc *PartitionContext) GetCurrentState() string {
return pc.stateMachine.Current()
}
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index ed036502..a086d326 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -366,15 +366,15 @@ func TestCalculateNodesResourceUsage(t *testing.T) {
err = partition.AddNode(node)
assert.NilError(t, err)
- occupiedResources :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
- alloc := newAllocation("key", "appID", nodeID1, occupiedResources)
+ res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+ alloc := newAllocation("key", "appID", nodeID1, res)
node.AddAllocation(alloc)
usageMap := partition.calculateNodesResourceUsage()
assert.Equal(t, node.GetAvailableResource().Resources["first"],
resources.Quantity(50))
assert.Equal(t, usageMap["first"][4], 1)
- occupiedResources =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
- alloc = newAllocation("key", "appID", nodeID1, occupiedResources)
+ res =
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
+ alloc = newAllocation("key", "appID", nodeID1, res)
node.AddAllocation(alloc)
usageMap = partition.calculateNodesResourceUsage()
assert.Equal(t, node.GetAvailableResource().Resources["first"],
resources.Quantity(0))
@@ -4689,3 +4689,56 @@ func
TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
assert.Equal(t, "real-alloc", confirmed.GetAllocationKey())
assert.Equal(t, "tg-1", confirmed.GetTaskGroup())
}
+
+func TestForeignAllocation(t *testing.T) {
+ setupUGM()
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+ nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ node1 := newNodeMaxResource(nodeID1, nodeRes)
+ err = partition.AddNode(node1)
+ assert.NilError(t, err)
+
+ // error: adding request (non-allocation)
+ req := newForeignRequest("foreign-nonalloc")
+ reqCreated, allocCreated, err := partition.UpdateAllocation(req)
+ assert.Assert(t, !reqCreated)
+ assert.Assert(t, !allocCreated)
+ assert.Error(t, err, "trying to add a foreign request (non-allocation)
foreign-nonalloc")
+
+ // error: empty node ID
+ req = newForeignAllocation(foreignAlloc1, "")
+ reqCreated, allocCreated, err = partition.UpdateAllocation(req)
+ assert.Assert(t, !reqCreated)
+ assert.Assert(t, !allocCreated)
+ assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1")
+
+ // error: no node found
+ req = newForeignAllocation(foreignAlloc1, nodeID2)
+ reqCreated, allocCreated, err = partition.UpdateAllocation(req)
+ assert.Assert(t, !reqCreated)
+ assert.Assert(t, !allocCreated)
+ assert.Error(t, err, "failed to find node node-2 for allocation
foreign-alloc-1")
+ assert.Equal(t, 0, len(partition.foreignAllocs))
+
+ // add new allocation
+ req = newForeignAllocation(foreignAlloc1, nodeID1)
+ reqCreated, allocCreated, err = partition.UpdateAllocation(req)
+ assert.Assert(t, !reqCreated)
+ assert.Assert(t, allocCreated)
+ assert.NilError(t, err)
+ assert.Equal(t, 1, len(partition.foreignAllocs))
+ assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1])
+ assert.Equal(t, 1, len(node1.GetAllAllocations()))
+ assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil)
+
+ // remove allocation
+ released, confirmed := partition.removeAllocation(&si.AllocationRelease{
+ AllocationKey: foreignAlloc1,
+ })
+ assert.Assert(t, released == nil)
+ assert.Assert(t, confirmed == nil)
+ assert.Equal(t, 0, len(partition.foreignAllocs))
+ assert.Equal(t, 0, len(node1.GetAllAllocations()))
+ assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil)
+}
diff --git a/pkg/scheduler/tests/operation_test.go
b/pkg/scheduler/tests/operation_test.go
index 98e3e742..2245697b 100644
--- a/pkg/scheduler/tests/operation_test.go
+++ b/pkg/scheduler/tests/operation_test.go
@@ -654,3 +654,111 @@ partitions:
assert.Equal(t,
int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]),
int64(0))
assert.Equal(t,
int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]),
int64(20))
}
+
+func TestForeignPodResourceUsage(t *testing.T) {
+ // Register RM
+ configData := `
+partitions:
+ -
+ name: default
+ queues:
+ - name: root
+ submitacl: "*"
+ queues:
+ - name: a
+ resources:
+ max:
+ memory: 150
+ vcore: 20
+`
+ // Start all tests
+ ms := &mockScheduler{}
+ defer ms.Stop()
+
+ err := ms.Init(configData, false, false)
+ assert.NilError(t, err, "RegisterResourceManager failed")
+
+ // Check queues of cache and scheduler.
+ partitionInfo :=
ms.scheduler.GetClusterContext().GetPartition("[rm:123]default")
+ assert.Assert(t, partitionInfo.GetTotalPartitionResource() == nil,
"partition info max resource nil")
+
+ // Register a node
+ err = ms.proxy.UpdateNode(&si.NodeRequest{
+ Nodes: []*si.NodeInfo{
+ {
+ NodeID: "node-1:1234",
+ Attributes: map[string]string{},
+ SchedulableResource: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 100},
+ "vcore": {Value: 10},
+ },
+ },
+ Action: si.NodeInfo_CREATE,
+ },
+ },
+ RmID: "rm:123",
+ })
+
+ assert.NilError(t, err, "NodeRequest failed")
+
+ // Wait until node is registered
+ context := ms.scheduler.GetClusterContext()
+ ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+ waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000)
+
+ assert.Equal(t, len(partitionInfo.GetNodes()), 1)
+ node1 := partitionInfo.GetNode("node-1:1234")
+ assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]),
int64(100))
+ schedulingNode1 := ms.scheduler.GetClusterContext().
+ GetNode("node-1:1234", "[rm:123]default")
+ assert.Equal(t,
int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]),
int64(0))
+ assert.Equal(t,
int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]),
int64(100))
+
+ // update node capacity - add foreign pod
+ res := resources.NewResourceFromMap(map[string]resources.Quantity{
+ "memory": 80, "vcore": 5})
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Allocations: []*si.Allocation{
+ {
+ AllocationKey: "foreignpod-1",
+ ResourcePerAlloc: res.ToProto(),
+ AllocationTags: map[string]string{
+ common.Foreign: common.AllocTypeDefault,
+ },
+ NodeID: "node-1:1234",
+ },
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "NodeRequest failed")
+ waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(),
"[rm:123]default",
+ []string{"node-1:1234"}, 20, 1000)
+ assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]),
int64(100))
+ assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]),
int64(10))
+ assert.Equal(t,
int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80))
+ assert.Equal(t,
int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5))
+ assert.Equal(t,
int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]),
int64(0))
+ assert.Equal(t,
int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]),
int64(20))
+
+ // update node capacity - remove foreign pod
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Releases: &si.AllocationReleasesRequest{
+ AllocationsToRelease: []*si.AllocationRelease{
+ {
+ AllocationKey: "foreignpod-1",
+ },
+ },
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err)
+ waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(),
"[rm:123]default",
+ []string{"node-1:1234"}, 100, 1000)
+ assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]),
int64(100))
+ assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]),
int64(10))
+ assert.Equal(t,
int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(0))
+ assert.Equal(t,
int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(0))
+ assert.Equal(t,
int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]),
int64(0))
+ assert.Equal(t,
int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]),
int64(100))
+}
diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go
index a87b26bd..56845fd9 100644
--- a/pkg/scheduler/utilities_test.go
+++ b/pkg/scheduler/utilities_test.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
+ siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -52,6 +53,7 @@ const (
allocKey3 = "alloc-3"
maxresources = "maxresources"
maxapplications = "maxapplications"
+ foreignAlloc1 = "foreign-alloc-1"
)
func newBasePartitionNoRootDefault() (*PartitionContext, error) {
@@ -599,6 +601,36 @@ func newAllocationAll(allocKey, appID, nodeID, taskGroup
string, res *resources.
})
}
+func newForeignRequest(allocKey string) *objects.Allocation {
+ return objects.NewAllocationFromSI(&si.Allocation{
+ AllocationKey: allocKey,
+ AllocationTags: map[string]string{
+ siCommon.Foreign: siCommon.AllocTypeDefault,
+ },
+ })
+}
+
+func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
+ var alloc *objects.Allocation
+ defer func() {
+ if nodeID == "" {
+ alloc.SetNodeID("")
+ }
+ }()
+ id := nodeID
+ if nodeID == "" {
+ id = "tmp" // set a temporary value to force allocated = true
+ }
+ alloc = objects.NewAllocationFromSI(&si.Allocation{
+ AllocationKey: allocKey,
+ AllocationTags: map[string]string{
+ siCommon.Foreign: siCommon.AllocTypeDefault,
+ },
+ NodeID: id,
+ })
+ return alloc
+}
+
func newAllocationAskPreempt(allocKey, appID string, prio int32, res
*resources.Resource) *objects.Allocation {
return objects.NewAllocationFromSI(&si.Allocation{
AllocationKey: allocKey,
@@ -614,6 +646,7 @@ func newAllocationAskPreempt(allocKey, appID string, prio
int32, res *resources.
},
})
}
+
func newNodeWithResources(nodeID string, max, occupied *resources.Resource)
*objects.Node {
proto := &si.NodeInfo{
NodeID: nodeID,
diff --git a/pkg/webservice/dao/allocation_info.go
b/pkg/webservice/dao/allocation_info.go
index 1aa4b587..e399cbdf 100644
--- a/pkg/webservice/dao/allocation_info.go
+++ b/pkg/webservice/dao/allocation_info.go
@@ -34,3 +34,12 @@ type AllocationDAOInfo struct {
Preempted bool `json:"preempted,omitempty"`
Originator bool `json:"originator,omitempty"`
}
+
+type ForeignAllocationDAOInfo struct {
+ AllocationKey string `json:"allocationKey"` // no
omitempty, allocation key should not be empty
+ AllocationTime int64 `json:"allocationTime,omitempty"`
+ ResourcePerAlloc map[string]int64 `json:"resource,omitempty"`
+ Priority string `json:"priority,omitempty"`
+ NodeID string `json:"nodeId,omitempty"`
+ Preemptable bool `json:"preemptable,omitempty"`
+}
diff --git a/pkg/webservice/dao/node_info.go b/pkg/webservice/dao/node_info.go
index 27fd7d58..7d75c12d 100644
--- a/pkg/webservice/dao/node_info.go
+++ b/pkg/webservice/dao/node_info.go
@@ -24,17 +24,18 @@ type NodesDAOInfo struct {
}
type NodeDAOInfo struct {
- NodeID string `json:"nodeID"` // no omitempty, node
id should not be empty
- HostName string `json:"hostName,omitempty"`
- RackName string `json:"rackName,omitempty"`
- Attributes map[string]string `json:"attributes,omitempty"`
- Capacity map[string]int64 `json:"capacity,omitempty"`
- Allocated map[string]int64 `json:"allocated,omitempty"`
- Occupied map[string]int64 `json:"occupied,omitempty"`
- Available map[string]int64 `json:"available,omitempty"`
- Utilized map[string]int64 `json:"utilized,omitempty"`
- Allocations []*AllocationDAOInfo `json:"allocations,omitempty"`
- Schedulable bool `json:"schedulable"` // no omitempty,
a false value gives a quick way to understand whether a node is schedulable.
- IsReserved bool `json:"isReserved"` // no omitempty,
a false value gives a quick way to understand whether a node is reserved.
- Reservations []string `json:"reservations,omitempty"`
+ NodeID string `json:"nodeID"` // no
omitempty, node id should not be empty
+ HostName string
`json:"hostName,omitempty"`
+ RackName string
`json:"rackName,omitempty"`
+ Attributes map[string]string
`json:"attributes,omitempty"`
+ Capacity map[string]int64
`json:"capacity,omitempty"`
+ Allocated map[string]int64
`json:"allocated,omitempty"`
+ Occupied map[string]int64
`json:"occupied,omitempty"`
+ Available map[string]int64
`json:"available,omitempty"`
+ Utilized map[string]int64
`json:"utilized,omitempty"`
+ Allocations []*AllocationDAOInfo
`json:"allocations,omitempty"`
+ ForeignAllocations []*ForeignAllocationDAOInfo
`json:"foreignAllocations,omitempty"`
+ Schedulable bool `json:"schedulable"` //
no omitempty, a false value gives a quick way to understand whether a node is
schedulable.
+ IsReserved bool `json:"isReserved"` //
no omitempty, a false value gives a quick way to understand whether a node is
reserved.
+ Reservations []string
`json:"reservations,omitempty"`
}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 49309e70..8e455a71 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -254,6 +254,19 @@ func getAllocationDAO(alloc *objects.Allocation)
*dao.AllocationDAOInfo {
return allocDAO
}
+func getForeignAllocationDAO(alloc *objects.Allocation)
*dao.ForeignAllocationDAOInfo {
+ allocTime := alloc.GetCreateTime().UnixNano()
+ allocDAO := &dao.ForeignAllocationDAOInfo{
+ AllocationKey: alloc.GetAllocationKey(),
+ AllocationTime: allocTime,
+ ResourcePerAlloc: alloc.GetAllocatedResource().DAOMap(),
+ Priority: strconv.Itoa(int(alloc.GetPriority())),
+ NodeID: alloc.GetNodeID(),
+ Preemptable: alloc.IsPreemptable(),
+ }
+ return allocDAO
+}
+
func getAllocationsDAO(allocations []*objects.Allocation)
[]*dao.AllocationDAOInfo {
allocsDAO := make([]*dao.AllocationDAOInfo, 0, len(allocations))
for _, alloc := range allocations {
@@ -262,6 +275,14 @@ func getAllocationsDAO(allocations []*objects.Allocation)
[]*dao.AllocationDAOIn
return allocsDAO
}
+func getForeignAllocationsDAO(allocations []*objects.Allocation)
[]*dao.ForeignAllocationDAOInfo {
+ allocsDAO := make([]*dao.ForeignAllocationDAOInfo, 0, len(allocations))
+ for _, alloc := range allocations {
+ allocsDAO = append(allocsDAO, getForeignAllocationDAO(alloc))
+ }
+ return allocsDAO
+}
+
func getPlaceholderDAO(ph *objects.PlaceholderData) *dao.PlaceholderDAOInfo {
phDAO := &dao.PlaceholderDAOInfo{
TaskGroupName: ph.TaskGroupName,
@@ -375,19 +396,20 @@ func getAllocationAsksDAO(asks []*objects.Allocation)
[]*dao.AllocationAskDAOInf
func getNodeDAO(node *objects.Node) *dao.NodeDAOInfo {
return &dao.NodeDAOInfo{
- NodeID: node.NodeID,
- HostName: node.Hostname,
- RackName: node.Rackname,
- Attributes: node.GetAttributes(),
- Capacity: node.GetCapacity().DAOMap(),
- Occupied: node.GetOccupiedResource().DAOMap(),
- Allocated: node.GetAllocatedResource().DAOMap(),
- Available: node.GetAvailableResource().DAOMap(),
- Utilized: node.GetUtilizedResource().DAOMap(),
- Allocations: getAllocationsDAO(node.GetAllAllocations()),
- Schedulable: node.IsSchedulable(),
- IsReserved: node.IsReserved(),
- Reservations: node.GetReservationKeys(),
+ NodeID: node.NodeID,
+ HostName: node.Hostname,
+ RackName: node.Rackname,
+ Attributes: node.GetAttributes(),
+ Capacity: node.GetCapacity().DAOMap(),
+ Occupied: node.GetOccupiedResource().DAOMap(),
+ Allocated: node.GetAllocatedResource().DAOMap(),
+ Available: node.GetAvailableResource().DAOMap(),
+ Utilized: node.GetUtilizedResource().DAOMap(),
+ Allocations:
getAllocationsDAO(node.GetYunikornAllocations()),
+ ForeignAllocations:
getForeignAllocationsDAO(node.GetForeignAllocations()),
+ Schedulable: node.IsSchedulable(),
+ IsReserved: node.IsReserved(),
+ Reservations: node.GetReservationKeys(),
}
}
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index b9e4d088..bba72166 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -1407,18 +1407,24 @@ func TestGetPartitionNode(t *testing.T) {
resAlloc1 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
500, siCommon.CPU: 300})
resAlloc2 :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
300, siCommon.CPU: 500})
alloc1 := newAlloc("alloc-1", appID, node1ID, resAlloc1)
- allocs := []*objects.Allocation{alloc1}
err = partition.AddNode(node1)
assert.NilError(t, err, "add node to partition should not have failed")
- _, allocCreated, err := partition.UpdateAllocation(allocs[0])
+ _, allocCreated, err := partition.UpdateAllocation(alloc1)
assert.NilError(t, err, "add alloc-1 should not have failed")
assert.Check(t, allocCreated)
+ falloc1 := newForeignAlloc("foreign-1", "", node1ID, resAlloc1,
siCommon.AllocTypeDefault, 0)
+ _, allocCreated, err = partition.UpdateAllocation(falloc1)
+ assert.NilError(t, err, "add falloc-1 should not have failed")
+ assert.Check(t, allocCreated)
+ falloc2 := newForeignAlloc("foreign-2", "", node1ID, resAlloc2,
siCommon.AllocTypeStatic, 123)
+ _, allocCreated, err = partition.UpdateAllocation(falloc2)
+ assert.NilError(t, err, "add falloc-2 should not have failed")
+ assert.Check(t, allocCreated)
alloc2 := newAlloc("alloc-2", appID, node2ID, resAlloc2)
- allocs = []*objects.Allocation{alloc2}
err = partition.AddNode(node2)
assert.NilError(t, err, "add node to partition should not have failed")
- _, allocCreated, err = partition.UpdateAllocation(allocs[0])
+ _, allocCreated, err = partition.UpdateAllocation(alloc2)
assert.NilError(t, err, "add alloc-2 should not have failed")
assert.Check(t, allocCreated)
@@ -1434,6 +1440,14 @@ func TestGetPartitionNode(t *testing.T) {
err = json.Unmarshal(resp.outputBytes, &nodeInfo)
assert.NilError(t, err, unmarshalError)
assertNodeInfo(t, &nodeInfo, node1ID, "alloc-1", attributesOfnode1,
map[string]int64{"memory": 50, "vcore": 30})
+ assert.Equal(t, 2, len(nodeInfo.ForeignAllocations))
+ if nodeInfo.ForeignAllocations[0].AllocationKey == "foreign-1" {
+ assertForeignAllocation(t, "foreign-1", "0", node1ID,
resAlloc1, true, nodeInfo.ForeignAllocations[0])
+ assertForeignAllocation(t, "foreign-2", "123", node1ID,
resAlloc2, false, nodeInfo.ForeignAllocations[1])
+ } else {
+ assertForeignAllocation(t, "foreign-1", "0", node1ID,
resAlloc1, true, nodeInfo.ForeignAllocations[1])
+ assertForeignAllocation(t, "foreign-2", "123", node1ID,
resAlloc2, false, nodeInfo.ForeignAllocations[0])
+ }
// Test node id is missing
req, err = createRequest(t, "/ws/v1/partition/default/node/node_1",
map[string]string{"partition": "default", "node": ""})
@@ -1464,6 +1478,20 @@ func assertNodeInfo(t *testing.T, node *dao.NodeDAOInfo,
expectedID string, expe
assert.DeepEqual(t, expectedUtilized, node.Utilized)
}
+func assertForeignAllocation(t *testing.T, key, priority, nodeID string,
expectedRes *resources.Resource, preemptable bool, info
*dao.ForeignAllocationDAOInfo) {
+ t.Helper()
+ assert.Equal(t, key, info.AllocationKey)
+ assert.Equal(t, priority, info.Priority)
+ assert.Equal(t, nodeID, info.NodeID)
+ resMap := make(map[string]resources.Quantity)
+ for k, v := range info.ResourcePerAlloc {
+ resMap[k] = resources.Quantity(v)
+ }
+ resFromInfo := resources.NewResourceFromMap(resMap)
+ assert.Assert(t, resources.Equals(resFromInfo, expectedRes))
+ assert.Equal(t, preemptable, info.Preemptable)
+}
+
// addApp Add app to the given partition and assert the app count, state etc
func addApp(t *testing.T, id string, part *scheduler.PartitionContext,
queueName string, isCompleted bool) *objects.Application {
return addAppWithUserGroup(t, id, part, queueName, isCompleted,
security.UserGroup{})
@@ -2991,3 +3019,15 @@ func newAlloc(allocationKey string, appID string, nodeID
string, resAlloc *resou
ResourcePerAlloc: resAlloc.ToProto(),
})
}
+
+func newForeignAlloc(allocationKey string, appID string, nodeID string,
resAlloc *resources.Resource, fType string, priority int32) *objects.Allocation
{
+ return objects.NewAllocationFromSI(&si.Allocation{
+ AllocationKey: allocationKey,
+ NodeID: nodeID,
+ ResourcePerAlloc: resAlloc.ToProto(),
+ AllocationTags: map[string]string{
+ siCommon.Foreign: fType,
+ },
+ Priority: priority,
+ })
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]