This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new a1b8480d [YUNIKORN-2588] Shim: Replace AllocationID with AllocationKey
(#828)
a1b8480d is described below
commit a1b8480d9a2b5580bbfdbfccbc2631da343937c5
Author: Craig Condit <[email protected]>
AuthorDate: Sat Apr 27 06:27:16 2024 -0500
[YUNIKORN-2588] Shim: Replace AllocationID with AllocationKey (#828)
The scheduler interface has been updated to use AllocationKey instead
of AllocationID (as these are now equivalent). Update references to
reflect this change.
Closes: #828
---
go.mod | 4 +-
go.sum | 8 +--
pkg/cache/application.go | 6 +--
pkg/cache/application_state.go | 12 ++---
pkg/cache/application_state_test.go | 60 +++++++++++-----------
pkg/cache/application_test.go | 29 ++++++-----
pkg/cache/context.go | 1 -
pkg/cache/context_test.go | 23 ++++-----
pkg/cache/scheduler_callback.go | 11 ++--
pkg/cache/task.go | 36 ++++++-------
pkg/cache/task_state.go | 14 ++---
pkg/cache/task_test.go | 28 +++++-----
pkg/cache/utils_test.go | 5 +-
pkg/common/si_helper.go | 7 ++-
pkg/common/si_helper_test.go | 8 +--
pkg/common/test/recoverable_apps_mock.go | 3 +-
test/e2e/basic_scheduling/basic_scheduling_test.go | 1 -
.../recovery_and_restart_test.go | 1 -
18 files changed, 124 insertions(+), 133 deletions(-)
diff --git a/go.mod b/go.mod
index 0d16d4c4..b4de07f0 100644
--- a/go.mod
+++ b/go.mod
@@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.21
require (
- github.com/apache/yunikorn-core v0.0.0-20240424145521-e17eafaab1c8
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240423191701-8c98b1604a7a
+ github.com/apache/yunikorn-core v0.0.0-20240427112336-e3d94294b4ef
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240425182941-07f5695119a1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
diff --git a/go.sum b/go.sum
index 5f983d0f..2efcb230 100644
--- a/go.sum
+++ b/go.sum
@@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df
h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4
v4.0.0-20230305170008-8188dc5388df/go.mod
h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
-github.com/apache/yunikorn-core v0.0.0-20240424145521-e17eafaab1c8
h1:xPcUd/tDOrIatdgyzCgUtRlvfM4s/DN4cS49iyw3rnU=
-github.com/apache/yunikorn-core v0.0.0-20240424145521-e17eafaab1c8/go.mod
h1:ZXkFNHrLLReWAcEGj6Ya3hkmr5lMpa9WgIy4Lx0dlxw=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240423191701-8c98b1604a7a
h1:H978zsTL2FvbRFnySO83DOFLO33PwHWFdmHvMoSVXsc=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240423191701-8c98b1604a7a/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
+github.com/apache/yunikorn-core v0.0.0-20240427112336-e3d94294b4ef
h1:m/wkG8mJqJ/eAihWR/g5IVfkHl79ve5geXXKM0U/slE=
+github.com/apache/yunikorn-core v0.0.0-20240427112336-e3d94294b4ef/go.mod
h1:BaNNx6FksvVu/8tfo7qNQvVhAsD284ybdJnLoFIexgA=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240425182941-07f5695119a1
h1:v4J9L3MlW8BQfYnbq6FV2l3uyay3SqMS2Ffpo+SFat4=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240425182941-07f5695119a1/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod
h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index f43d97f4..3420cc43 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -598,14 +598,14 @@ func (app *Application) handleFailApplicationEvent(errMsg
string) {
}
}
-func (app *Application) handleReleaseAppAllocationEvent(allocationID string,
terminationType string) {
+func (app *Application) handleReleaseAppAllocationEvent(allocationKey string,
terminationType string) {
log.Log(log.ShimCacheApplication).Info("try to release pod from
application",
zap.String("appID", app.applicationID),
- zap.String("allocationID", allocationID),
+ zap.String("allocationKey", allocationKey),
zap.String("terminationType", terminationType))
for _, task := range app.taskMap {
- if task.allocationID == allocationID {
+ if task.allocationKey == allocationKey {
task.setTaskTerminationType(terminationType)
err := task.DeleteTaskPod()
if err != nil {
diff --git a/pkg/cache/application_state.go b/pkg/cache/application_state.go
index cb629394..b64f9486 100644
--- a/pkg/cache/application_state.go
+++ b/pkg/cache/application_state.go
@@ -267,15 +267,15 @@ func (ue UpdateApplicationReservationEvent)
GetApplicationID() string {
// ------------------------
type ReleaseAppAllocationEvent struct {
applicationID string
- allocationID string
+ allocationKey string
terminationType string
event ApplicationEventType
}
-func NewReleaseAppAllocationEvent(appID string, allocTermination
si.TerminationType, allocationID string) ReleaseAppAllocationEvent {
+func NewReleaseAppAllocationEvent(appID string, allocTermination
si.TerminationType, allocationKey string) ReleaseAppAllocationEvent {
return ReleaseAppAllocationEvent{
applicationID: appID,
- allocationID: allocationID,
+ allocationKey: allocationKey,
terminationType:
si.TerminationType_name[int32(allocTermination)],
event: ReleaseAppAllocation,
}
@@ -287,7 +287,7 @@ func (re ReleaseAppAllocationEvent) GetApplicationID()
string {
func (re ReleaseAppAllocationEvent) GetArgs() []interface{} {
args := make([]interface{}, 2)
- args[0] = re.allocationID
+ args[0] = re.allocationKey
args[1] = re.terminationType
return args
}
@@ -546,9 +546,9 @@ func newAppState() *fsm.FSM { //nolint:funlen
log.Log(log.ShimFSM).Error("fail to
parse event arg", zap.Error(err))
return
}
- allocationID := eventArgs[0]
+ allocationKey := eventArgs[0]
terminationType := eventArgs[1]
-
app.handleReleaseAppAllocationEvent(allocationID, terminationType)
+
app.handleReleaseAppAllocationEvent(allocationKey, terminationType)
},
ReleaseAppAllocationAsk.String(): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
diff --git a/pkg/cache/application_state_test.go
b/pkg/cache/application_state_test.go
index c2f94e93..1247a488 100644
--- a/pkg/cache/application_state_test.go
+++ b/pkg/cache/application_state_test.go
@@ -671,22 +671,22 @@ func
TestUpdateApplicationReservationEventGetApplicationID(t *testing.T) {
func TestNewReleaseAppAllocationEvent(t *testing.T) {
tests := []struct {
- name string
- appID, allocationID string
- terminationType si.TerminationType
- wantID, wantAllocationID, wantType string
- wantEvent ApplicationEventType
+ name string
+ appID, allocationKey string
+ terminationType si.TerminationType
+ wantID, wantAllocationKey, wantType string
+ wantEvent ApplicationEventType
}{
- {TestCreateName, "testAppId001", "testAllocationID001",
si.TerminationType_TIMEOUT, "testAppId001", "testAllocationID001", "TIMEOUT",
ReleaseAppAllocation},
+ {TestCreateName, "testAppId001", "testTaskId001",
si.TerminationType_TIMEOUT, "testAppId001", "testTaskId001", "TIMEOUT",
ReleaseAppAllocation},
}
for _, tt := range tests {
- instance := NewReleaseAppAllocationEvent(tt.appID,
tt.terminationType, tt.allocationID)
+ instance := NewReleaseAppAllocationEvent(tt.appID,
tt.terminationType, tt.allocationKey)
t.Run(tt.name, func(t *testing.T) {
- if instance.applicationID != tt.wantID ||
instance.allocationID != tt.wantAllocationID || instance.terminationType !=
tt.wantType || instance.event != tt.wantEvent {
+ if instance.applicationID != tt.wantID ||
instance.allocationKey != tt.wantAllocationKey || instance.terminationType !=
tt.wantType || instance.event != tt.wantEvent {
t.Errorf("want %s %s %s %s, got %s %s %s %s",
- tt.wantID, tt.wantAllocationID,
tt.wantType, tt.wantEvent,
- instance.applicationID,
instance.allocationID, instance.terminationType, instance.event)
+ tt.wantID, tt.wantAllocationKey,
tt.wantType, tt.wantEvent,
+ instance.applicationID,
instance.allocationKey, instance.terminationType, instance.event)
}
})
}
@@ -694,16 +694,16 @@ func TestNewReleaseAppAllocationEvent(t *testing.T) {
func TestReleaseAppAllocationEventGetEvent(t *testing.T) {
tests := []struct {
- name string
- appID, allocationID string
- terminationType si.TerminationType
- wantEvent ApplicationEventType
+ name string
+ appID, allocationKey string
+ terminationType si.TerminationType
+ wantEvent ApplicationEventType
}{
- {TestEventName, "testAppId001", "testAllocationID001",
si.TerminationType_TIMEOUT, ReleaseAppAllocation},
+ {TestEventName, "testAppId001", "testTaskId001",
si.TerminationType_TIMEOUT, ReleaseAppAllocation},
}
for _, tt := range tests {
- instance := NewReleaseAppAllocationEvent(tt.appID,
tt.terminationType, tt.allocationID)
+ instance := NewReleaseAppAllocationEvent(tt.appID,
tt.terminationType, tt.allocationKey)
event := instance.GetEvent()
t.Run(tt.name, func(t *testing.T) {
if event != tt.wantEvent.String() {
@@ -715,18 +715,18 @@ func TestReleaseAppAllocationEventGetEvent(t *testing.T) {
func TestReleaseAppAllocationEventGetArgs(t *testing.T) {
tests := []struct {
- name string
- appID, allocationID string
- terminationType si.TerminationType
- wantLen int
- castOk []bool
- wantArg []string
+ name string
+ appID, allocationKey string
+ terminationType si.TerminationType
+ wantLen int
+ castOk []bool
+ wantArg []string
}{
- {TestArgsName, "testAppId001", "testAllocationID001",
si.TerminationType_TIMEOUT, 2, []bool{true, true},
[]string{"testAllocationID001", "TIMEOUT"}},
+ {TestArgsName, "testAppId001", "testTaskId001",
si.TerminationType_TIMEOUT, 2, []bool{true, true}, []string{"testTaskId001",
"TIMEOUT"}},
}
for _, tt := range tests {
- instance := NewReleaseAppAllocationEvent(tt.appID,
tt.terminationType, tt.allocationID)
+ instance := NewReleaseAppAllocationEvent(tt.appID,
tt.terminationType, tt.allocationKey)
args := instance.GetArgs()
t.Run(tt.name, func(t *testing.T) {
if len(args) != tt.wantLen {
@@ -748,16 +748,16 @@ func TestReleaseAppAllocationEventGetArgs(t *testing.T) {
func TestReleaseAppAllocationEventGetApplicationID(t *testing.T) {
tests := []struct {
- name string
- appID, allocationID string
- terminationType si.TerminationType
- wantID string
+ name string
+ appID, allocationKey string
+ terminationType si.TerminationType
+ wantID string
}{
- {TestAppIDName, "testAppId001", "testAllocationID001",
si.TerminationType_TIMEOUT, "testAppId001"},
+ {TestAppIDName, "testAppId001", "testTaskId001",
si.TerminationType_TIMEOUT, "testAppId001"},
}
for _, tt := range tests {
- instance := NewReleaseAppAllocationEvent(tt.appID,
tt.terminationType, tt.allocationID)
+ instance := NewReleaseAppAllocationEvent(tt.appID,
tt.terminationType, tt.allocationKey)
appID := instance.GetApplicationID()
t.Run(tt.name, func(t *testing.T) {
if appID != tt.wantID {
diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go
index 874a6e14..56c9e818 100644
--- a/pkg/cache/application_test.go
+++ b/pkg/cache/application_test.go
@@ -442,9 +442,9 @@ func TestReleaseAppAllocation(t *testing.T) {
app := NewApplication(appID, "root.abc", "testuser", testGroups,
map[string]string{}, ms)
task := NewTask("task01", app, context, pod)
app.addTask(task)
- task.allocationID = taskAllocationID
+ task.allocationKey = task.taskID
// app must be running states
- err := app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, taskAllocationID))
+ err := app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, task.taskID))
if err == nil {
// this should give an error
t.Error("expecting error got 'nil'")
@@ -452,7 +452,7 @@ func TestReleaseAppAllocation(t *testing.T) {
// set app states to running, let event can be trigger
app.SetState(ApplicationStates().Running)
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
- err = app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, taskAllocationID))
+ err = app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, task.taskID))
assert.NilError(t, err)
// after handle release event the states of app must be running
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
@@ -823,7 +823,7 @@ func TestTryReservePostRestart(t *testing.T) {
Containers: containers,
},
})
- task0.allocationID = string(task0.pod.UID)
+ task0.allocationKey = string(task0.pod.UID)
task0.nodeName = "fake-host"
task0.sm.SetState(TaskStates().Allocated)
@@ -1002,9 +1002,10 @@ func TestReleaseAppAllocationInFailingState(t
*testing.T) {
app := NewApplication(appID, "root.abc", "testuser", testGroups,
map[string]string{}, ms)
task := NewTask("task01", app, context, pod)
app.addTask(task)
- task.allocationID = taskAllocationID
+ task.allocationKey = task.taskID
+
// app must be running states
- err := app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, taskAllocationID))
+ err := app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, task.taskID))
if err == nil {
// this should give an error
t.Error("expecting error got 'nil'")
@@ -1012,12 +1013,12 @@ func TestReleaseAppAllocationInFailingState(t
*testing.T) {
// set app states to running, let event can be trigger
app.SetState(ApplicationStates().Running)
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
- err = app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, taskAllocationID))
+ err = app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, task.taskID))
assert.NilError(t, err)
// after handle release event the states of app must be running
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
app.SetState(ApplicationStates().Failing)
- err = app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, taskAllocationID))
+ err = app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, task.taskID))
assert.NilError(t, err)
// after handle release event the states of app must be failing
assertAppState(t, app, ApplicationStates().Failing, 3*time.Second)
@@ -1057,7 +1058,7 @@ func TestResumingStateTransitions(t *testing.T) {
// Add tasks
app.addTask(task1)
app.addTask(task2)
- task1.allocationID = taskAllocationID
+ task1.allocationKey = task1.taskID
context.addApplicationToContext(app)
// Set app state to "reserving"
@@ -1127,7 +1128,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "pod00001",
Namespace: "default",
- UID: "UID-POD-00001",
+ UID: "task01",
Labels: map[string]string{
"queue": "root.a",
"applicationId": "app00001",
@@ -1150,7 +1151,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) {
ObjectMeta: apis.ObjectMeta{
Name: "pod00002",
Namespace: "default",
- UID: "UID-POD-00002",
+ UID: "task02",
Labels: map[string]string{
"queue": "root.a",
"applicationId": "app00001",
@@ -1169,7 +1170,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) {
assert.Equal(t, len(app.GetNewTasks()), 1)
appID := "app00001"
- allocationID := "UID-POD-00002"
+ allocationKey := "task02"
task1 := context.AddTask(&AddTaskRequest{
Metadata: TaskMetadata{
@@ -1185,12 +1186,12 @@ func TestPlaceholderTimeoutEvents(t *testing.T) {
_, taskErr := app.GetTask("task02")
assert.NilError(t, taskErr, "Task should exist")
- task1.allocationID = allocationID
+ task1.allocationKey = allocationKey
// set app states to running, let event can be trigger
app.SetState(ApplicationStates().Running)
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
- err := app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, allocationID))
+ err := app.handle(NewReleaseAppAllocationEvent(appID,
si.TerminationType_TIMEOUT, allocationKey))
assert.NilError(t, err)
// after handle release event the states of app must be running
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 6c9f34e2..301f1096 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -1737,7 +1737,6 @@ func getExistingAllocation(pod *v1.Pod) *si.Allocation {
return &si.Allocation{
AllocationKey: string(pod.UID),
AllocationTags: meta.Tags,
- AllocationID: string(pod.UID),
ResourcePerAlloc: common.GetPodResource(pod),
NodeID: pod.Spec.NodeName,
ApplicationID: meta.ApplicationID,
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 33e4f10f..103c9369 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -870,7 +870,7 @@ func TestRecoverTask(t *testing.T) {
applicationID: alloc.ApplicationID,
taskID: alloc.AllocationKey,
nodeID: alloc.NodeID,
- allocationID: alloc.AllocationID,
+ allocationKey: alloc.AllocationKey,
event: TaskAllocated,
})
}
@@ -922,7 +922,7 @@ func TestRecoverTask(t *testing.T) {
err := utils.WaitForCondition(func() bool {
return task.GetTaskState() == TaskStates().Bound
}, 100*time.Millisecond, 3*time.Second)
- assert.NilError(t, err, "failed to wait for allocation allocationID
being set for task")
+ assert.NilError(t, err, "failed to wait for allocation allocationKey
being set for task")
// add a tasks to the existing application
// this task was already completed with state: Succeed
@@ -971,11 +971,11 @@ func TestRecoverTask(t *testing.T) {
assert.Equal(t, len(app.getTasks(TaskStates().New)), 1)
taskInfoVerifiers := []struct {
- taskID string
- expectedState string
- expectedAllocationID string
- expectedPodName string
- expectedNodeName string
+ taskID string
+ expectedState string
+ expectedAllocationKey string
+ expectedPodName string
+ expectedNodeName string
}{
{taskUID1, TaskStates().Bound, taskUID1, "pod1", fakeNodeName},
{taskUID2, TaskStates().Completed, taskUID2, "pod2",
fakeNodeName},
@@ -989,7 +989,7 @@ func TestRecoverTask(t *testing.T) {
rt, err := app.GetTask(tt.taskID)
assert.NilError(t, err)
assert.Equal(t, rt.GetTaskState(), tt.expectedState)
- assert.Equal(t, rt.allocationID,
tt.expectedAllocationID)
+ assert.Equal(t, rt.allocationKey,
tt.expectedAllocationKey)
assert.Equal(t, rt.pod.Name, tt.expectedPodName)
assert.Equal(t, rt.alias, fmt.Sprintf("%s/%s",
podNamespace, tt.expectedPodName))
})
@@ -1011,7 +1011,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) {
applicationID: alloc.ApplicationID,
taskID: alloc.AllocationKey,
nodeID: alloc.NodeID,
- allocationID: alloc.AllocationID,
+ allocationKey: alloc.AllocationKey,
event: TaskAllocated,
})
}
@@ -1061,7 +1061,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) {
err := utils.WaitForCondition(func() bool {
return task0.GetTaskState() == TaskStates().Bound
}, 100*time.Millisecond, 3*time.Second)
- assert.NilError(t, err, "failed to wait for allocation allocationID
being set for task0")
+ assert.NilError(t, err, "failed to wait for allocation allocationKey
being set for task0")
task1 := context.AddTask(&AddTaskRequest{
Metadata: TaskMetadata{
@@ -1080,7 +1080,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) {
err = utils.WaitForCondition(func() bool {
return task1.GetTaskState() == TaskStates().Bound
}, 100*time.Millisecond, 3*time.Second)
- assert.NilError(t, err, "failed to wait for allocation allocationID
being set for task1")
+ assert.NilError(t, err, "failed to wait for allocation allocationKey
being set for task1")
// app should have 2 tasks recovered
app, exist := context.applications[appID]
@@ -1955,7 +1955,6 @@ func TestGetExistingAllocation(t *testing.T) {
alloc := getExistingAllocation(pod)
assert.Equal(t, alloc.ApplicationID, "app00001")
assert.Equal(t, alloc.AllocationKey, string(pod.UID))
- assert.Equal(t, alloc.AllocationID, string(pod.UID))
assert.Equal(t, alloc.NodeID, "allocated-node")
}
diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index 81163572..e7ce30dc 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -51,14 +51,13 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
// got allocation for pod, bind pod to the scheduled node
log.Log(log.ShimRMCallback).Debug("callback: response to new
allocation",
zap.String("allocationKey", alloc.AllocationKey),
- zap.String("allocationID", alloc.AllocationID),
zap.String("applicationID", alloc.ApplicationID),
zap.String("nodeID", alloc.NodeID))
// update cache
task := callback.context.getTask(alloc.ApplicationID,
alloc.AllocationKey)
if task != nil {
- task.setAllocationID(alloc.AllocationID)
+ task.setAllocationKey(alloc.AllocationKey)
} else {
log.Log(log.ShimRMCallback).Warn("Unable to get task",
zap.String("taskID", alloc.AllocationKey))
}
@@ -72,9 +71,9 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
if task != nil {
if utils.IsAssignedPod(task.GetTaskPod()) {
// task is already bound, fixup state
and continue
-
task.MarkPreviouslyAllocated(alloc.AllocationID, alloc.NodeID)
+
task.MarkPreviouslyAllocated(alloc.AllocationKey, alloc.NodeID)
} else {
- ev :=
NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey,
alloc.AllocationID, alloc.NodeID)
+ ev :=
NewAllocateTaskEvent(app.GetApplicationID(), task.taskID, alloc.AllocationKey,
alloc.NodeID)
dispatcher.Dispatch(ev)
}
}
@@ -105,7 +104,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
for _, release := range response.Released {
log.Log(log.ShimRMCallback).Debug("callback: response to
released allocations",
- zap.String("AllocationID", release.AllocationID))
+ zap.String("AllocationKey", release.AllocationKey))
// update cache
callback.context.ForgetPod(release.GetAllocationKey())
@@ -113,7 +112,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
// TerminationType 0 mean STOPPED_BY_RM
if release.TerminationType != si.TerminationType_STOPPED_BY_RM {
// send release app allocation to application states
machine
- ev :=
NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType,
release.AllocationID)
+ ev :=
NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType,
release.AllocationKey)
dispatcher.Dispatch(ev)
}
}
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index bb54f4c0..05b1b889 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -44,7 +44,7 @@ type Task struct {
alias string
applicationID string
application *Application
- allocationID string
+ allocationKey string
resource *si.Resource
pod *v1.Pod
podStatus v1.PodStatus // pod status, maintained separately for
efficiency reasons
@@ -169,10 +169,10 @@ func (task *Task) getTaskGroupName() string {
return task.taskGroupName
}
-func (task *Task) getTaskAllocationID() string {
+func (task *Task) getNodeName() string {
task.lock.RLock()
defer task.lock.RUnlock()
- return task.allocationID
+ return task.nodeName
}
func (task *Task) DeleteTaskPod() error {
@@ -210,13 +210,13 @@ func (task *Task) initialize() {
// the resources were already released, instead of starting
// from New, directly set the task to Completed
if utils.IsPodTerminated(task.pod) {
- task.allocationID = string(task.pod.UID)
+ task.allocationKey = string(task.pod.UID)
task.nodeName = task.pod.Spec.NodeName
task.sm.SetState(TaskStates().Completed)
log.Log(log.ShimCacheTask).Info("set task as Completed",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
- zap.String("allocationID", task.allocationID),
+ zap.String("allocationKey", task.allocationKey),
zap.String("nodeName", task.nodeName))
}
}
@@ -260,12 +260,12 @@ func (task *Task) SetTaskSchedulingState(state
TaskSchedulingState) {
task.schedulingState = state
}
-func (task *Task) MarkPreviouslyAllocated(allocationID string, nodeID string) {
+func (task *Task) MarkPreviouslyAllocated(allocationKey string, nodeID string)
{
task.sm.SetState(TaskStates().Bound)
task.lock.Lock()
defer task.lock.Unlock()
task.schedulingState = TaskSchedAllocated
- task.allocationID = allocationID
+ task.allocationKey = allocationKey
task.nodeName = nodeID
if task.placeholder {
log.Log(log.ShimCacheTask).Info("placeholder is bound",
@@ -413,9 +413,9 @@ func (task *Task) postTaskAllocated() {
// If we find the task is already in Completed state while handling
TaskAllocated
// event, we need to explicitly release this allocation because it is no
// longer valid.
-func (task *Task) beforeTaskAllocated(eventSrc string, allocationID string,
nodeID string) {
- // task is allocated on a node with a allocationID set the details in
the task here to allow referencing later.
- task.allocationID = allocationID
+func (task *Task) beforeTaskAllocated(eventSrc string, allocationKey string,
nodeID string) {
+ // task is allocated on a node with a allocationKey set the details in
the task here to allow referencing later.
+ task.allocationKey = allocationKey
task.nodeName = nodeID
// If the task is Completed the pod was deleted on K8s but the core was
not aware yet.
// Notify the core to release this allocation to avoid resource leak.
@@ -423,7 +423,7 @@ func (task *Task) beforeTaskAllocated(eventSrc string,
allocationID string, node
if eventSrc == TaskStates().Completed {
log.Log(log.ShimCacheTask).Info("task is already completed,
invalidate the allocation",
zap.String("currentTaskState", eventSrc),
- zap.String("allocationID", allocationID),
+ zap.String("allocationKey", allocationKey),
zap.String("allocatedNode", nodeID))
task.releaseAllocation()
}
@@ -505,7 +505,7 @@ func (task *Task) releaseAllocation() {
zap.String("applicationID", task.applicationID),
zap.String("taskID", task.taskID),
zap.String("taskAlias", task.alias),
- zap.String("allocationID", task.allocationID),
+ zap.String("allocationKey", task.allocationKey),
zap.String("task", task.GetTaskState()),
zap.String("terminationType", task.terminationType))
@@ -516,17 +516,17 @@ func (task *Task) releaseAllocation() {
s := TaskStates()
switch task.GetTaskState() {
case s.New, s.Pending, s.Scheduling, s.Rejected:
- releaseRequest =
common.CreateReleaseRequestForTask(task.applicationID, task.taskID,
task.allocationID, task.application.partition, task.terminationType)
+ releaseRequest =
common.CreateReleaseRequestForTask(task.applicationID, task.taskID,
task.allocationKey, task.application.partition, task.terminationType)
default:
- if task.allocationID == "" {
- log.Log(log.ShimCacheTask).Warn("BUG: task
allocation allocationID is empty on release",
+ if task.allocationKey == "" {
+ log.Log(log.ShimCacheTask).Warn("BUG: task
allocationKey is empty on release",
zap.String("applicationID",
task.applicationID),
zap.String("taskID", task.taskID),
zap.String("taskAlias", task.alias),
zap.String("task", task.GetTaskState()))
}
releaseRequest = common.CreateReleaseRequestForTask(
- task.applicationID, task.taskID,
task.allocationID, task.application.partition, task.terminationType)
+ task.applicationID, task.taskID,
task.allocationKey, task.application.partition, task.terminationType)
}
if releaseRequest.Releases != nil {
@@ -588,10 +588,10 @@ func (task *Task) UpdatePodCondition(podCondition
*v1.PodCondition) (bool, *v1.P
return false, pod
}
-func (task *Task) setAllocationID(allocationID string) {
+func (task *Task) setAllocationKey(allocationKey string) {
task.lock.Lock()
defer task.lock.Unlock()
- task.allocationID = allocationID
+ task.allocationKey = allocationKey
}
func (task *Task) failWithEvent(errorMessage, actionReason string) {
diff --git a/pkg/cache/task_state.go b/pkg/cache/task_state.go
index f0371bd6..d0ee58ef 100644
--- a/pkg/cache/task_state.go
+++ b/pkg/cache/task_state.go
@@ -127,15 +127,15 @@ type AllocatedTaskEvent struct {
taskID string
event TaskEventType
nodeID string
- allocationID string
+ allocationKey string
}
-func NewAllocateTaskEvent(appID string, taskID string, allocationID string,
nid string) AllocatedTaskEvent {
+func NewAllocateTaskEvent(appID string, taskID string, allocationKey string,
nid string) AllocatedTaskEvent {
return AllocatedTaskEvent{
applicationID: appID,
taskID: taskID,
event: TaskAllocated,
- allocationID: allocationID,
+ allocationKey: allocationKey,
nodeID: nid,
}
}
@@ -146,7 +146,7 @@ func (ae AllocatedTaskEvent) GetEvent() string {
func (ae AllocatedTaskEvent) GetArgs() []interface{} {
args := make([]interface{}, 2)
- args[0] = ae.allocationID
+ args[0] = ae.allocationKey
args[1] = ae.nodeID
return args
}
@@ -419,15 +419,15 @@ func newTaskState() *fsm.FSM {
},
beforeHook(TaskAllocated): func(_ context.Context,
event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
- // All allocation events must include the
allocationID and nodeID passed from the core
+ // All allocation events must include the
allocationKey and nodeID passed from the core
eventArgs := make([]string, 2)
if err :=
events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err !=
nil {
log.Log(log.ShimFSM).Error("failed to
parse event arg", zap.Error(err))
return
}
- allocationID := eventArgs[0]
+ allocationKey := eventArgs[0]
nodeID := eventArgs[1]
- task.beforeTaskAllocated(event.Src,
allocationID, nodeID)
+ task.beforeTaskAllocated(event.Src,
allocationKey, nodeID)
},
beforeHook(CompleteTask): func(_ context.Context, event
*fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go
index c08c4f03..3b11bbc5 100644
--- a/pkg/cache/task_test.go
+++ b/pkg/cache/task_test.go
@@ -170,7 +170,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
},
ObjectMeta: metav1.ObjectMeta{
Name: "pod-resource-test-00001",
- UID: "UID-00001",
+ UID: "task01",
},
Spec: v1.PodSpec{
Containers: containers,
@@ -201,9 +201,9 @@ func TestReleaseTaskAllocation(t *testing.T) {
assert.Equal(t, task.GetTaskState(), TaskStates().Allocated)
// bind a task is a async process, wait for it to happen
err = common.WaitFor(100*time.Millisecond, 3*time.Second, func() bool {
- return task.getTaskAllocationID() == string(pod.UID)
+ return task.getNodeName() == "node-1"
})
- assert.NilError(t, err, "failed to wait for allocation allocationID
being set for task")
+ assert.NilError(t, err, "failed to wait for allocation allocationKey
being set for task")
// bound
event3 := NewBindTaskEvent(app.applicationID, task.taskID)
@@ -218,7 +218,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
assert.Assert(t, request.Releases.AllocationsToRelease != nil)
assert.Equal(t,
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
assert.Equal(t,
request.Releases.AllocationsToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationsToRelease[0].AllocationID, "UID-00001")
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].AllocationKey, "task01")
assert.Assert(t, request.Releases.AllocationAsksToRelease !=
nil)
assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
@@ -235,7 +235,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
// 2 updates call, 1 for submit, 1 for release
assert.Equal(t,
mockedApiProvider.GetSchedulerAPIUpdateAllocationCount(), int32(2))
- // New to Failed, no AllocationID is set (only ask is released)
+ // New to Failed, no AllocationKey is set (only ask is released)
task = NewTask("task01", app, mockedContext, pod)
mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
assert.Assert(t, request.Releases != nil)
@@ -250,16 +250,16 @@ func TestReleaseTaskAllocation(t *testing.T) {
err = task.handle(NewFailTaskEvent(app.applicationID, "task01", "test
failure"))
assert.NilError(t, err, "failed to handle FailTask event")
- // Scheduling to Failed, AllocationID is set (ask+allocation are both
released)
+ // Scheduling to Failed, AllocationKey is set (ask+allocation are both
released)
task = NewTask("task01", app, mockedContext, pod)
- task.setAllocationID("alloc-0")
+ task.setAllocationKey("task01")
task.sm.SetState(TaskStates().Scheduling)
mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
assert.Assert(t, request.Releases != nil)
assert.Assert(t, request.Releases.AllocationsToRelease != nil)
assert.Equal(t,
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
assert.Equal(t,
request.Releases.AllocationsToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationsToRelease[0].AllocationID, "alloc-0")
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].AllocationKey, "task01")
assert.Assert(t, request.Releases.AllocationAsksToRelease !=
nil)
assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
@@ -618,10 +618,9 @@ func TestHandleSubmitTaskEvent(t *testing.T) {
func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
const (
- podUID = "UID-00001"
- appID = "app-test-001"
- queueName = "root.abc"
- allocationID = "allocationid-xyz"
+ podUID = "UID-00001"
+ appID = "app-test-001"
+ queueName = "root.abc"
)
mockedContext := initContextForTest()
mockedAPIProvider, ok :=
mockedContext.apiProvider.(*client.MockedAPIProvider)
@@ -679,7 +678,6 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
// can be released from the core to avoid resource leak
alloc := &si.Allocation{
AllocationKey: string(pod1.UID),
- AllocationID: allocationID,
NodeID: "fake-node",
ApplicationID: appID,
PartitionName: "default",
@@ -691,10 +689,10 @@ func TestSimultaneousTaskCompleteAndAllocate(t
*testing.T) {
"allocationsToRelease is not in the expected length")
allocToRelease := request.Releases.AllocationsToRelease[0]
assert.Equal(t, allocToRelease.ApplicationID,
alloc.ApplicationID)
- assert.Equal(t, allocToRelease.AllocationID, alloc.AllocationID)
+ assert.Equal(t, allocToRelease.AllocationKey,
alloc.AllocationKey)
return nil
})
- ev1 := NewAllocateTaskEvent(app.GetApplicationID(),
alloc.AllocationKey, alloc.AllocationID, alloc.NodeID)
+ ev1 := NewAllocateTaskEvent(app.GetApplicationID(),
alloc.AllocationKey, alloc.AllocationKey, alloc.NodeID)
err = task1.handle(ev1)
assert.NilError(t, err, "failed to handle AllocateTask event")
assert.Equal(t, task1.GetTaskState(), TaskStates().Completed)
diff --git a/pkg/cache/utils_test.go b/pkg/cache/utils_test.go
index 5c79f8cc..1e527399 100644
--- a/pkg/cache/utils_test.go
+++ b/pkg/cache/utils_test.go
@@ -30,9 +30,8 @@ import (
)
const (
- appID = "app01"
- app2ID = "app02"
- taskAllocationID = "ALLOCATIONID01"
+ appID = "app01"
+ app2ID = "app02"
)
//nolint:funlen
diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go
index b679a844..9c4520a0 100644
--- a/pkg/common/si_helper.go
+++ b/pkg/common/si_helper.go
@@ -95,7 +95,6 @@ func CreateAllocationForTask(appID, taskID, nodeID string,
resource *si.Resource
allocation := si.Allocation{
AllocationKey: taskID,
AllocationTags: CreateTagsForTask(pod),
- AllocationID: taskID,
ResourcePerAlloc: resource,
Priority: CreatePriorityForTask(pod),
NodeID: nodeID,
@@ -122,13 +121,13 @@ func GetTerminationTypeFromString(terminationTypeStr
string) si.TerminationType
return si.TerminationType_STOPPED_BY_RM
}
-func CreateReleaseRequestForTask(appID, taskID, allocationID, partition,
terminationType string) *si.AllocationRequest {
+func CreateReleaseRequestForTask(appID, taskID, allocationKey, partition,
terminationType string) *si.AllocationRequest {
var allocToRelease []*si.AllocationRelease
- if allocationID != "" {
+ if allocationKey != "" {
allocToRelease = make([]*si.AllocationRelease, 1)
allocToRelease[0] = &si.AllocationRelease{
ApplicationID: appID,
- AllocationID: allocationID,
+ AllocationKey: allocationKey,
PartitionName: partition,
TerminationType:
GetTerminationTypeFromString(terminationType),
Message: "task completed",
diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go
index 04ac266e..67be6147 100644
--- a/pkg/common/si_helper_test.go
+++ b/pkg/common/si_helper_test.go
@@ -32,21 +32,21 @@ import (
const nodeID = "node-01"
func TestCreateReleaseRequestForTask(t *testing.T) {
- // with "allocationID"
- request := CreateReleaseRequestForTask("app01", "task01", "alloc01",
"default", "STOPPED_BY_RM")
+ // with allocationKey
+ request := CreateReleaseRequestForTask("app01", "task01", "task01",
"default", "STOPPED_BY_RM")
assert.Assert(t, request.Releases != nil)
assert.Assert(t, request.Releases.AllocationsToRelease != nil)
assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
assert.Equal(t, len(request.Releases.AllocationsToRelease), 1)
assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID,
"app01")
- assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationID,
"alloc01")
+ assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey,
"task01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName,
"default")
assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
- // without allocationID
+ // without allocationKey
request = CreateReleaseRequestForTask("app01", "task01", "", "default",
"STOPPED_BY_RM")
assert.Assert(t, request.Releases != nil)
assert.Assert(t, request.Releases.AllocationsToRelease == nil)
diff --git a/pkg/common/test/recoverable_apps_mock.go
b/pkg/common/test/recoverable_apps_mock.go
index 3ebb1b12..9efdd268 100644
--- a/pkg/common/test/recoverable_apps_mock.go
+++ b/pkg/common/test/recoverable_apps_mock.go
@@ -38,9 +38,8 @@ func (m *MockedRecoverableAppManager) ListPods() ([]*v1.Pod,
error) {
func (m *MockedRecoverableAppManager) GetExistingAllocation(pod *v1.Pod)
*si.Allocation {
return &si.Allocation{
- AllocationKey: pod.Name,
+ AllocationKey: string(pod.UID),
AllocationTags: nil,
- AllocationID: string(pod.UID),
ResourcePerAlloc: nil,
Priority: 0,
NodeID: pod.Spec.NodeName,
diff --git a/test/e2e/basic_scheduling/basic_scheduling_test.go
b/test/e2e/basic_scheduling/basic_scheduling_test.go
index 4a7de566..9ca63d7f 100644
--- a/test/e2e/basic_scheduling/basic_scheduling_test.go
+++ b/test/e2e/basic_scheduling/basic_scheduling_test.go
@@ -111,7 +111,6 @@ var _ = ginkgo.Describe("", func() {
gomega.Ω(allocation.AllocationKey).NotTo(gomega.BeNil())
gomega.Ω(allocation.NodeID).NotTo(gomega.BeNil())
gomega.Ω(allocation.Partition).NotTo(gomega.BeNil())
- gomega.Ω(allocation.AllocationID).NotTo(gomega.BeNil())
gomega.Ω(allocation.ApplicationID).To(gomega.Equal(sleepRespPod.ObjectMeta.Labels["applicationId"]))
core :=
sleepRespPod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue()
mem :=
sleepRespPod.Spec.Containers[0].Resources.Requests.Memory().Value()
diff --git a/test/e2e/recovery_and_restart/recovery_and_restart_test.go
b/test/e2e/recovery_and_restart/recovery_and_restart_test.go
index 0492890d..5e89952f 100644
--- a/test/e2e/recovery_and_restart/recovery_and_restart_test.go
+++ b/test/e2e/recovery_and_restart/recovery_and_restart_test.go
@@ -142,7 +142,6 @@ var _ = ginkgo.Describe("", func() {
gomega.Ω(allocations.AllocationKey).NotTo(gomega.BeNil())
gomega.Ω(allocations.NodeID).NotTo(gomega.BeNil())
gomega.Ω(allocations.Partition).NotTo(gomega.BeNil())
- gomega.Ω(allocations.AllocationID).NotTo(gomega.BeNil())
gomega.Ω(allocations.ApplicationID).To(gomega.Equal(sleepRespPod.ObjectMeta.Labels["applicationId"]))
core :=
sleepRespPod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue()
mem :=
sleepRespPod.Spec.Containers[0].Resources.Requests.Memory().Value()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]