This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new 982bc48f [YUNIKORN-2562] Nil pointer panic in
Application.ReplaceAllocation() (#846)
982bc48f is described below
commit 982bc48f890bc786e2edb450b67f6952e68a98f3
Author: Peter Bacsko <[email protected]>
AuthorDate: Mon Apr 22 11:19:40 2024 +0200
[YUNIKORN-2562] Nil pointer panic in Application.ReplaceAllocation() (#846)
Closes: #846
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/application.go | 35 +++---
pkg/scheduler/objects/application_test.go | 13 +++
pkg/scheduler/partition_test.go | 66 +++++++++++
pkg/scheduler/tests/mock_rm_callback_test.go | 31 +++++
pkg/scheduler/tests/recovery_test.go | 165 +++++++++++++++++++++++++++
5 files changed, 291 insertions(+), 19 deletions(-)
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 74f7c5dc..e6940680 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -638,25 +638,13 @@ func (sa *Application) AddAllocationAsk(ask
*AllocationAsk) error {
zap.Error(err))
}
}
- sa.requests[ask.GetAllocationKey()] = ask
-
- // update app priority
- repeat := ask.GetPendingAskRepeat()
- priority := ask.GetPriority()
- if repeat > 0 && priority > sa.askMaxPriority {
- sa.askMaxPriority = priority
- sa.queue.UpdateApplicationPriority(sa.ApplicationID,
sa.askMaxPriority)
- }
+ sa.addAllocationAskInternal(ask)
// Update total pending resource
delta.SubFrom(oldAskResource)
sa.pending = resources.Add(sa.pending, delta)
sa.queue.incPendingResource(delta)
- if ask.IsPlaceholder() {
- sa.addPlaceholderData(ask)
- }
-
log.Log(log.SchedApplication).Info("ask added successfully to
application",
zap.String("appID", sa.ApplicationID),
zap.String("user", sa.user.User),
@@ -678,6 +666,19 @@ func (sa *Application) RecoverAllocationAsk(ask
*AllocationAsk) {
if ask == nil {
return
}
+
+ sa.addAllocationAskInternal(ask)
+
+ // progress the application from New to Accepted.
+ if sa.IsNew() {
+ if err := sa.HandleApplicationEvent(RunApplication); err != nil
{
+ log.Log(log.SchedApplication).Debug("Application state
change failed while recovering allocation ask",
+ zap.Error(err))
+ }
+ }
+}
+
+func (sa *Application) addAllocationAskInternal(ask *AllocationAsk) {
sa.requests[ask.GetAllocationKey()] = ask
// update app priority
@@ -688,12 +689,8 @@ func (sa *Application) RecoverAllocationAsk(ask
*AllocationAsk) {
sa.queue.UpdateApplicationPriority(sa.ApplicationID,
sa.askMaxPriority)
}
- // progress the application from New to Accepted.
- if sa.IsNew() {
- if err := sa.HandleApplicationEvent(RunApplication); err != nil
{
- log.Log(log.SchedApplication).Debug("Application state
change failed while recovering allocation ask",
- zap.Error(err))
- }
+ if ask.IsPlaceholder() {
+ sa.addPlaceholderData(ask)
}
}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index ad0dc58c..2300d729 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -524,6 +524,19 @@ func TestRecoverAllocAsk(t *testing.T) {
assert.Equal(t, len(app.requests), 2, "ask should have been added,
total should be 2")
assert.Assert(t, app.IsAccepted(), "Application should have stayed in
accepted state")
assertUserGroupResource(t, getTestUserGroup(), nil)
+
+ assert.Equal(t, 0, len(app.placeholderData))
+ ask = newAllocationAskTG("ask-3", appID1, "testGroup", res, 1)
+ app.RecoverAllocationAsk(ask)
+ phData := app.placeholderData
+ assert.Equal(t, 1, len(phData))
+ taskGroupData := phData["testGroup"]
+ assert.Assert(t, taskGroupData != nil)
+ assert.Equal(t, "testGroup", taskGroupData.TaskGroupName)
+ assert.Equal(t, int64(1), taskGroupData.Count)
+ assert.Equal(t, int64(0), taskGroupData.Replaced)
+ assert.Equal(t, int64(0), taskGroupData.TimedOut)
+ assert.Assert(t, resources.Equals(taskGroupData.MinResource, res))
}
// test reservations removal by allocation
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 3baa40cc..7b4d0583 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -4423,3 +4423,69 @@ func TestCalculateOutstandingRequests(t *testing.T) {
assert.Equal(t, 3, len(requests))
assert.Assert(t, resources.Equals(expectedTotal, total), "total
resource expected: %v, got: %v", expectedTotal, total)
}
+
+func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
+ // verify the following (YUNIKORN-2562):
+ // 1. Have a recovered, existing PH allocation (ph-1) from a node with
task group "tg-1"
+ // 2. Have a new PH ask (ph-2) with task group "tg-2"
+ // 3. Have a real ask with task group "tg-1"
+ // 4. EXPECTED: successful allocation for the pending ask (ph-2)
+ // 5. EXPECTED: successful placeholder allocation (replacement)
+ // 6. EXPECTED: successful removal of ph-1 allocation
+ setupUGM()
+ partition, err := newBasePartition()
+ assert.NilError(t, err, "partition create failed")
+
+ // add a new app
+ app := newApplication(appID1, "default", defQueue)
+ err = partition.AddApplication(app)
+ assert.NilError(t, err, "add application to partition should not have
failed")
+
+ // add a node with allocation
+ nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
+ node1 := newNodeMaxResource(nodeID1, nodeRes)
+ appRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
+ phAsk := newAllocationAskTG("placeholder", appID1, taskGroup, appRes,
true)
+ ph := objects.NewAllocation(nodeID1, phAsk)
+ allocs := []*objects.Allocation{ph}
+ err = partition.AddNode(node1, allocs)
+ assert.NilError(t, err)
+
+ // add a placeholder ask with a different taskgroup
+ phAsk2 := newAllocationAskTG("placeholder2", appID1, "tg-2", appRes,
true)
+ err = app.AddAllocationAsk(phAsk2)
+ assert.NilError(t, err, "failed to add placeholder ask")
+
+ realAsk := newAllocationAskTG("real-alloc", appID1, taskGroup, appRes,
false)
+ err = app.AddAllocationAsk(realAsk)
+ assert.NilError(t, err, "failed to add real ask")
+
+ // get an allocation for "placeholder2"
+ alloc := partition.tryAllocate()
+ assert.Assert(t, alloc != nil, "no allocation occurred")
+ assert.Equal(t, objects.Allocated, alloc.GetResult())
+ assert.Equal(t, "placeholder2", alloc.GetAllocationKey())
+ assert.Equal(t, "tg-2", alloc.GetTaskGroup())
+ assert.Equal(t, "node-1", alloc.GetNodeID())
+
+ // real allocation gets replaced
+ alloc = partition.tryPlaceholderAllocate()
+ assert.Assert(t, alloc != nil, "no placeholder replacement occurred")
+ assert.Equal(t, objects.Replaced, alloc.GetResult())
+ assert.Equal(t, "real-alloc", alloc.GetAllocationKey())
+ assert.Equal(t, "tg-1", alloc.GetTaskGroup())
+ assert.Equal(t, "real-alloc-0", alloc.GetAllocationID())
+
+ // remove the terminated placeholder allocation
+ released, confirmed := partition.removeAllocation(&si.AllocationRelease{
+ ApplicationID: appID1,
+ TerminationType: si.TerminationType_PLACEHOLDER_REPLACED,
+ AllocationKey: "real-alloc-0",
+ AllocationID: "placeholder-0",
+ })
+ assert.Assert(t, released == nil, "unexpected released allocation")
+ assert.Assert(t, confirmed != nil, "expected to have a confirmed
allocation")
+ assert.Equal(t, "real-alloc", confirmed.GetAllocationKey())
+ assert.Equal(t, "tg-1", confirmed.GetTaskGroup())
+ assert.Equal(t, "real-alloc-0", confirmed.GetAllocationID())
+}
diff --git a/pkg/scheduler/tests/mock_rm_callback_test.go
b/pkg/scheduler/tests/mock_rm_callback_test.go
index f9212a00..a908683d 100644
--- a/pkg/scheduler/tests/mock_rm_callback_test.go
+++ b/pkg/scheduler/tests/mock_rm_callback_test.go
@@ -38,6 +38,8 @@ type mockRMCallback struct {
rejectedNodes map[string]bool
nodeAllocations map[string][]*si.Allocation
Allocations map[string]*si.Allocation
+ releasedPhs map[string]*si.AllocationRelease
+ appStates map[string]string
locking.RWMutex
}
@@ -50,6 +52,8 @@ func newMockRMCallbackHandler() *mockRMCallback {
rejectedNodes: make(map[string]bool),
nodeAllocations: make(map[string][]*si.Allocation),
Allocations: make(map[string]*si.Allocation),
+ releasedPhs: make(map[string]*si.AllocationRelease),
+ appStates: make(map[string]string),
}
}
@@ -63,6 +67,10 @@ func (m *mockRMCallback) UpdateApplication(response
*si.ApplicationResponse) err
for _, app := range response.Rejected {
m.rejectedApplications[app.ApplicationID] = true
delete(m.acceptedApplications, app.ApplicationID)
+ delete(m.appStates, app.ApplicationID)
+ }
+ for _, app := range response.Updated {
+ m.appStates[app.ApplicationID] = app.State
}
return nil
}
@@ -83,6 +91,9 @@ func (m *mockRMCallback) UpdateAllocation(response
*si.AllocationResponse) error
}
for _, alloc := range response.Released {
delete(m.Allocations, alloc.AllocationID)
+ if alloc.TerminationType ==
si.TerminationType_PLACEHOLDER_REPLACED {
+ m.releasedPhs[alloc.AllocationID] = alloc
+ }
}
return nil
}
@@ -132,6 +143,15 @@ func (m *mockRMCallback) waitForRejectedApplication(t
*testing.T, appID string,
assert.NilError(t, err, "Failed to wait for rejected application: %s,
called from: %s", appID, caller())
}
+func (m *mockRMCallback) waitForApplicationState(t *testing.T, appID, state
string, timeoutMs int) {
+ err := common.WaitFor(10*time.Millisecond,
time.Duration(timeoutMs)*time.Millisecond, func() bool {
+ m.RLock()
+ defer m.RUnlock()
+ return m.appStates[appID] == state
+ })
+ assert.NilError(t, err, "Failed to wait for application %s state: %s,
called from: %s", appID, state, caller())
+}
+
func (m *mockRMCallback) waitForAcceptedNode(t *testing.T, nodeID string,
timeoutMs int) {
err := common.WaitFor(10*time.Millisecond,
time.Duration(timeoutMs)*time.Millisecond, func() bool {
m.RLock()
@@ -186,3 +206,14 @@ func (m *mockRMCallback) waitForMinAllocations(tb
testing.TB, nAlloc int, timeou
tb.Fatalf("Failed to wait for min allocations expected %d,
actual %d, called from: %s", nAlloc, allocLen, caller())
}
}
+
+func (m *mockRMCallback) waitForReleasedPlaceholders(t *testing.T, releases
int, timeoutMs int) {
+ var releasesLen int
+ err := common.WaitFor(10*time.Millisecond,
time.Duration(timeoutMs)*time.Millisecond, func() bool {
+ m.RLock()
+ defer m.RUnlock()
+ releasesLen = len(m.releasedPhs)
+ return releasesLen == releases
+ })
+ assert.NilError(t, err, "Failed to wait for placeholder releases,
expected %d, actual %d, called from: %s", releases, releasesLen, caller())
+}
diff --git a/pkg/scheduler/tests/recovery_test.go
b/pkg/scheduler/tests/recovery_test.go
index 6a7b8d26..09e0f26e 100644
--- a/pkg/scheduler/tests/recovery_test.go
+++ b/pkg/scheduler/tests/recovery_test.go
@@ -931,3 +931,168 @@ partitions:
appQueue = part.GetQueue("root.app-1-namespace")
assert.Assert(t, appQueue != nil, "application queue was not created
after recovery")
}
+
+func TestPlaceholderRecovery(t *testing.T) { //nolint:funlen
+ // create an existing allocation
+ existingAllocations := make([]*si.Allocation, 1)
+ existingAllocations[0] = &si.Allocation{
+ AllocationKey: "ph-alloc-1",
+ NodeID: "node-1:1234",
+ ApplicationID: appID1,
+ TaskGroupName: "tg-1",
+ AllocationID: "ph-alloc-1-0",
+ ResourcePerAlloc: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {
+ Value: 10,
+ },
+ "vcore": {
+ Value: 1,
+ },
+ },
+ },
+ Placeholder: true,
+ }
+
+ config := `partitions:
+ - name: default
+ queues:
+ - name: root
+ submitacl: "*"
+ queues:
+ - name: default`
+ ms := &mockScheduler{}
+ defer ms.Stop()
+ err := ms.Init(config, true, false)
+ assert.NilError(t, err, "RegisterResourceManager failed")
+
+ // Add application
+ err = ms.proxy.UpdateApplication(&si.ApplicationRequest{
+ New: newAddAppRequest(map[string]string{appID1:
"root.default"}),
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "ApplicationRequest failed")
+ ms.mockRM.waitForAcceptedApplication(t, appID1, 1000)
+
+ // Add node
+ err = ms.proxy.UpdateNode(&si.NodeRequest{
+ Nodes: []*si.NodeInfo{
+ {
+ NodeID: "node-1:1234",
+ Attributes: map[string]string{},
+ SchedulableResource: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 100},
+ "vcore": {Value: 20},
+ },
+ },
+ Action: si.NodeInfo_CREATE,
+ ExistingAllocations: existingAllocations,
+ },
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "NodeRequest nodes and app for recovery failed")
+ ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000)
+
+ // Add a new placeholder ask with a different task group
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: []*si.AllocationAsk{
+ {
+ AllocationKey: "ph-alloc-2",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10},
+ "vcore": {Value: 1},
+ },
+ },
+ MaxAllocations: 1,
+ ApplicationID: appID1,
+ TaskGroupName: "tg-2",
+ Placeholder: true,
+ },
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "AllocationRequest failed for placeholder ask")
+ ms.mockRM.waitForAllocations(t, 1, 1000)
+
+ // Add two real asks
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Asks: []*si.AllocationAsk{
+ {
+ AllocationKey: "real-alloc-1",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10},
+ "vcore": {Value: 1},
+ },
+ },
+ MaxAllocations: 1,
+ ApplicationID: appID1,
+ TaskGroupName: "tg-1",
+ },
+ {
+ AllocationKey: "real-alloc-2",
+ ResourceAsk: &si.Resource{
+ Resources: map[string]*si.Quantity{
+ "memory": {Value: 10},
+ "vcore": {Value: 1},
+ },
+ },
+ MaxAllocations: 1,
+ ApplicationID: appID1,
+ TaskGroupName: "tg-2",
+ },
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "AllocationRequest failed for real asks")
+ ms.mockRM.waitForReleasedPlaceholders(t, 2, 1000)
+
+ // remove placeholder allocations
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Releases: &si.AllocationReleasesRequest{
+ AllocationsToRelease: []*si.AllocationRelease{
+ {
+ ApplicationID: appID1,
+ PartitionName: "default",
+ AllocationID: "ph-alloc-1-0",
+ TerminationType:
si.TerminationType_PLACEHOLDER_REPLACED,
+ },
+ {
+ ApplicationID: appID1,
+ PartitionName: "default",
+ AllocationID: "ph-alloc-2-0",
+ TerminationType:
si.TerminationType_PLACEHOLDER_REPLACED,
+ },
+ },
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "AllocationReleasesRequest failed for
placeholders")
+
+ // remove real allocations
+ err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
+ Releases: &si.AllocationReleasesRequest{
+ AllocationsToRelease: []*si.AllocationRelease{
+ {
+ ApplicationID: appID1,
+ PartitionName: "default",
+ AllocationID: "real-alloc-1-0",
+ TerminationType:
si.TerminationType_STOPPED_BY_RM,
+ },
+ {
+ ApplicationID: appID1,
+ PartitionName: "default",
+ AllocationID: "real-alloc-2-0",
+ TerminationType:
si.TerminationType_STOPPED_BY_RM,
+ },
+ },
+ },
+ RmID: "rm:123",
+ })
+ assert.NilError(t, err, "AllocationReleasesRequest failed for real
allocations")
+
+ ms.mockRM.waitForApplicationState(t, appID1, "Completing", 1000)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]