This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new c384a882 [YUNIKORN-2467] Remove AllocationAsk from the core when a pod
is completed (#797)
c384a882 is described below
commit c384a8827f84018a8b6fe3bc4b8a2bda01a6cb66
Author: Peter Bacsko <[email protected]>
AuthorDate: Tue Mar 5 16:44:55 2024 -0600
[YUNIKORN-2467] Remove AllocationAsk from the core when a pod is completed
(#797)
Closes: #797
Signed-off-by: Craig Condit <[email protected]>
---
pkg/cache/task.go | 2 +-
pkg/cache/task_test.go | 2 +-
pkg/common/si_helper.go | 13 +++++++++++--
pkg/common/si_helper_test.go | 9 ++++++---
pkg/shim/scheduler_mock_test.go | 5 +++++
pkg/shim/scheduler_test.go | 3 +++
6 files changed, 27 insertions(+), 7 deletions(-)
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 223fd699..3a41157d 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -535,7 +535,7 @@ func (task *Task) releaseAllocation() {
return
}
releaseRequest =
common.CreateReleaseAllocationRequestForTask(
- task.applicationID, task.allocationID,
task.application.partition, task.terminationType)
+ task.applicationID, task.taskID,
task.allocationID, task.application.partition, task.terminationType)
}
if releaseRequest.Releases != nil {
diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go
index d31339c9..1f36c7b4 100644
--- a/pkg/cache/task_test.go
+++ b/pkg/cache/task_test.go
@@ -643,7 +643,7 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
PartitionName: "default",
}
mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
- assert.Equal(t, len(request.Releases.AllocationAsksToRelease),
0,
+ assert.Equal(t, len(request.Releases.AllocationAsksToRelease),
1,
"allocationAskToRelease is not in the expected length")
assert.Equal(t, len(request.Releases.AllocationsToRelease), 1,
"allocationsToRelease is not in the expected length")
diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go
index 8af265eb..4434a82b 100644
--- a/pkg/common/si_helper.go
+++ b/pkg/common/si_helper.go
@@ -142,7 +142,7 @@ func GetTerminationTypeFromString(terminationTypeStr
string) si.TerminationType
return si.TerminationType_STOPPED_BY_RM
}
-func CreateReleaseAllocationRequestForTask(appID, allocationID, partition,
terminationType string) *si.AllocationRequest {
+func CreateReleaseAllocationRequestForTask(appID, taskID, allocationID,
partition, terminationType string) *si.AllocationRequest {
toReleases := make([]*si.AllocationRelease, 0)
toReleases = append(toReleases, &si.AllocationRelease{
ApplicationID: appID,
@@ -152,8 +152,17 @@ func CreateReleaseAllocationRequestForTask(appID,
allocationID, partition, termi
Message: "task completed",
})
+ toReleaseAsk := make([]*si.AllocationAskRelease, 1)
+ toReleaseAsk[0] = &si.AllocationAskRelease{
+ ApplicationID: appID,
+ AllocationKey: taskID,
+ PartitionName: partition,
+ Message: "task request completed",
+ }
+
releaseRequest := si.AllocationReleasesRequest{
- AllocationsToRelease: toReleases,
+ AllocationsToRelease: toReleases,
+ AllocationAsksToRelease: toReleaseAsk,
}
return &si.AllocationRequest{
diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go
index 251cdd97..f3a5fcf6 100644
--- a/pkg/common/si_helper_test.go
+++ b/pkg/common/si_helper_test.go
@@ -33,15 +33,18 @@ import (
const nodeID = "node-01"
func TestCreateReleaseAllocationRequest(t *testing.T) {
- request := CreateReleaseAllocationRequestForTask("app01", "alloc01",
"default", "STOPPED_BY_RM")
+ request := CreateReleaseAllocationRequestForTask("app01", "task01",
"alloc01", "default", "STOPPED_BY_RM")
assert.Assert(t, request.Releases != nil)
assert.Assert(t, request.Releases.AllocationsToRelease != nil)
- assert.Assert(t, request.Releases.AllocationAsksToRelease == nil)
+ assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
assert.Equal(t, len(request.Releases.AllocationsToRelease), 1)
- assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 0)
+ assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID,
"app01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationID,
"alloc01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName,
"default")
+ assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
+ assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
+ assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
}
func TestCreateReleaseAskRequestForTask(t *testing.T) {
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index dd12a93d..9b13ee7c 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/apache/yunikorn-core/pkg/entrypoint"
+ "github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
@@ -322,6 +323,10 @@ func (fc *MockScheduler)
waitForApplicationStateInCore(appID, partition, expecte
}, time.Second, 5*time.Second)
}
+func (fc *MockScheduler) getApplicationFromCore(appID, partition string)
*objects.Application {
+ return
fc.coreContext.Scheduler.GetClusterContext().GetApplication(appID, partition)
+}
+
func (fc *MockScheduler) GetPodBindStats() client.BindStats {
return fc.apiProvider.GetPodBindStats()
}
diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go
index 38fe981f..84bb52ba 100644
--- a/pkg/shim/scheduler_test.go
+++ b/pkg/shim/scheduler_test.go
@@ -104,6 +104,9 @@ partitions:
cluster.waitAndAssertTaskState(t, "app0001", "task0002",
cache.TaskStates().Completed)
err = cluster.waitForApplicationStateInCore("app0001", partitionName,
"Completing")
assert.NilError(t, err)
+ app := cluster.getApplicationFromCore("app0001", partitionName)
+ assert.Equal(t, 0, len(app.GetAllRequests()), "asks were not removed
from the application")
+ assert.Equal(t, 0, len(app.GetAllAllocations()), "allocations were not
removed from the application")
}
func TestRejectApplications(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]