This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 2278b321 [YUNIKORN-2779] Shim: Use UpdateAllocation for both asks and
allocations (#891)
2278b321 is described below
commit 2278b3217c702ccb796e4d623bc7837625e5a4ec
Author: Craig Condit <[email protected]>
AuthorDate: Fri Aug 16 09:55:52 2024 -0500
[YUNIKORN-2779] Shim: Use UpdateAllocation for both asks and allocations
(#891)
Closes: #891
---
go.mod | 4 +-
go.sum | 8 +--
pkg/cache/application.go | 35 +++---------
pkg/cache/application_state.go | 64 ++--------------------
pkg/cache/application_state_test.go | 100 -----------------------------------
pkg/cache/scheduler_callback.go | 19 -------
pkg/cache/scheduler_callback_test.go | 48 -----------------
pkg/cache/task.go | 57 +++++++-------------
pkg/cache/task_test.go | 59 ++++++++-------------
pkg/common/si_helper.go | 40 +++++---------
pkg/common/si_helper_test.go | 40 ++++----------
11 files changed, 80 insertions(+), 394 deletions(-)
diff --git a/go.mod b/go.mod
index 349820d3..685499e7 100644
--- a/go.mod
+++ b/go.mod
@@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.21
require (
- github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586
+ github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2
+ github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240815142741-38a38685cd4e
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
diff --git a/go.sum b/go.sum
index 7408b97b..0c702f23 100644
--- a/go.sum
+++ b/go.sum
@@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df
h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4
v4.0.0-20230305170008-8188dc5388df/go.mod
h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
-github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9
h1:s1Co/K+cR9Q/GW0e974dToW9eyLQZxYoCp0TCoEuEj0=
-github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9/go.mod
h1:S9yGBGA2i2hAajtEc2t4lmiPJDZz3Ek8eVxz5KhJqGI=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586
h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
-github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240731203810-92032b13d586/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
+github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2
h1:m1kxL2ce3QfHOsYl5D+AfHn7xjFxP40b88na/7qzmS8=
+github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2/go.mod
h1:QHKfJ2RyZuQnZg28SnypmnvFxN/zfoYf+hmfxiVdq5g=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240815142741-38a38685cd4e
h1:ZOLst6ROwUrgoUQbEdYaz28iKuiU5YNYGtelKsTFhqw=
+github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240815142741-38a38685cd4e/go.mod
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod
h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index a6ec1f3f..085465ce 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -627,42 +627,19 @@ func (app *Application) handleFailApplicationEvent(errMsg
string) {
}
}
-func (app *Application) handleReleaseAppAllocationEvent(allocationKey string,
terminationType string) {
- log.Log(log.ShimCacheApplication).Info("try to release pod from
application",
- zap.String("appID", app.applicationID),
- zap.String("allocationKey", allocationKey),
- zap.String("terminationType", terminationType))
-
- for _, task := range app.taskMap {
- if task.allocationKey == allocationKey {
- task.setTaskTerminationType(terminationType)
- err := task.DeleteTaskPod()
- if err != nil {
- log.Log(log.ShimCacheApplication).Error("failed
to release allocation from application", zap.Error(err))
- }
- app.publishPlaceholderTimeoutEvents(task)
- }
- }
-}
-
-func (app *Application) handleReleaseAppAllocationAskEvent(taskID string,
terminationType string) {
+func (app *Application) handleReleaseAppAllocationEvent(taskID string,
terminationType string) {
log.Log(log.ShimCacheApplication).Info("try to release pod from
application",
zap.String("appID", app.applicationID),
zap.String("taskID", taskID),
zap.String("terminationType", terminationType))
+
if task, ok := app.taskMap[taskID]; ok {
task.setTaskTerminationType(terminationType)
- if task.IsPlaceholder() {
- err := task.DeleteTaskPod()
- if err != nil {
- log.Log(log.ShimCacheApplication).Error("failed
to release allocation ask from application", zap.Error(err))
- }
- app.publishPlaceholderTimeoutEvents(task)
- } else {
- log.Log(log.ShimCacheApplication).Warn("skip to release
allocation ask, ask is not a placeholder",
- zap.String("appID", app.applicationID),
- zap.String("taskID", taskID))
+ err := task.DeleteTaskPod()
+ if err != nil {
+ log.Log(log.ShimCacheApplication).Error("failed to
release allocation from application", zap.Error(err))
}
+ app.publishPlaceholderTimeoutEvents(task)
} else {
log.Log(log.ShimCacheApplication).Warn("task not found",
zap.String("appID", app.applicationID),
diff --git a/pkg/cache/application_state.go b/pkg/cache/application_state.go
index aee1c3e1..59541b24 100644
--- a/pkg/cache/application_state.go
+++ b/pkg/cache/application_state.go
@@ -49,13 +49,12 @@ const (
KillApplication
KilledApplication
ReleaseAppAllocation
- ReleaseAppAllocationAsk
ResumingApplication
AppTaskCompleted
)
func (ae ApplicationEventType) String() string {
- return [...]string{"SubmitApplication", "AcceptApplication",
"TryReserve", "UpdateReservation", "RunApplication", "RejectApplication",
"CompleteApplication", "FailApplication", "KillApplication",
"KilledApplication", "ReleaseAppAllocation", "ReleaseAppAllocationAsk",
"ResumingApplication", "AppTaskCompleted"}[ae]
+ return [...]string{"SubmitApplication", "AcceptApplication",
"TryReserve", "UpdateReservation", "RunApplication", "RejectApplication",
"CompleteApplication", "FailApplication", "KillApplication",
"KilledApplication", "ReleaseAppAllocation", "ResumingApplication",
"AppTaskCompleted"}[ae]
}
// ------------------------
@@ -295,37 +294,6 @@ func (re ReleaseAppAllocationEvent) GetEvent() string {
return re.event.String()
}
-type ReleaseAppAllocationAskEvent struct {
- applicationID string
- taskID string
- terminationType string
- event ApplicationEventType
-}
-
-func NewReleaseAppAllocationAskEvent(appID string, allocTermination
si.TerminationType, taskID string) ReleaseAppAllocationAskEvent {
- return ReleaseAppAllocationAskEvent{
- applicationID: appID,
- taskID: taskID,
- terminationType:
si.TerminationType_name[int32(allocTermination)],
- event: ReleaseAppAllocationAsk,
- }
-}
-
-func (re ReleaseAppAllocationAskEvent) GetApplicationID() string {
- return re.applicationID
-}
-
-func (re ReleaseAppAllocationAskEvent) GetArgs() []interface{} {
- args := make([]interface{}, 2)
- args[0] = re.taskID
- args[1] = re.terminationType
- return args
-}
-
-func (re ReleaseAppAllocationAskEvent) GetEvent() string {
- return re.event.String()
-}
-
// ------------------------
// Resuming application
// ------------------------
@@ -434,7 +402,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
},
{
Name: ReleaseAppAllocation.String(),
- Src: []string{states.Running},
+ Src: []string{states.Running, states.Accepted,
states.Reserving},
Dst: states.Running,
},
{
@@ -447,21 +415,6 @@ func newAppState() *fsm.FSM { //nolint:funlen
Src: []string{states.Resuming},
Dst: states.Resuming,
},
- {
- Name: ReleaseAppAllocationAsk.String(),
- Src: []string{states.Running, states.Accepted,
states.Reserving},
- Dst: states.Running,
- },
- {
- Name: ReleaseAppAllocationAsk.String(),
- Src: []string{states.Failing},
- Dst: states.Failing,
- },
- {
- Name: ReleaseAppAllocationAsk.String(),
- Src: []string{states.Resuming},
- Dst: states.Resuming,
- },
{
Name: CompleteApplication.String(),
Src: []string{states.Running},
@@ -543,17 +496,6 @@ func newAppState() *fsm.FSM { //nolint:funlen
app.onReservationStateChange()
},
ReleaseAppAllocation.String(): func(_ context.Context,
event *fsm.Event) {
- app := event.Args[0].(*Application)
//nolint:errcheck
- eventArgs := make([]string, 2)
- if err :=
events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err !=
nil {
- log.Log(log.ShimFSM).Error("fail to
parse event arg", zap.Error(err))
- return
- }
- allocationKey := eventArgs[0]
- terminationType := eventArgs[1]
-
app.handleReleaseAppAllocationEvent(allocationKey, terminationType)
- },
- ReleaseAppAllocationAsk.String(): func(_
context.Context, event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
eventArgs := make([]string, 2)
if err :=
events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err !=
nil {
@@ -562,7 +504,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
}
taskID := eventArgs[0]
terminationType := eventArgs[1]
- app.handleReleaseAppAllocationAskEvent(taskID,
terminationType)
+ app.handleReleaseAppAllocationEvent(taskID,
terminationType)
},
AppTaskCompleted.String(): func(_ context.Context,
event *fsm.Event) {
app := event.Args[0].(*Application)
//nolint:errcheck
diff --git a/pkg/cache/application_state_test.go
b/pkg/cache/application_state_test.go
index d16e3012..75626bd9 100644
--- a/pkg/cache/application_state_test.go
+++ b/pkg/cache/application_state_test.go
@@ -769,105 +769,6 @@ func TestReleaseAppAllocationEventGetApplicationID(t
*testing.T) {
}
}
-func TestNewReleaseAppAllocationAskEvent(t *testing.T) {
- tests := []struct {
- name string
- appID, taskID string
- terminationType si.TerminationType
- wantID, wantTaskID, wantType string
- wantEvent ApplicationEventType
- }{
- {TestCreateName, "testAppId001", "testTaskId001",
si.TerminationType_TIMEOUT, "testAppId001", "testTaskId001", "TIMEOUT",
ReleaseAppAllocationAsk},
- }
-
- for _, tt := range tests {
- instance := NewReleaseAppAllocationAskEvent(tt.appID,
tt.terminationType, tt.taskID)
- t.Run(tt.name, func(t *testing.T) {
- if instance.applicationID != tt.wantID ||
instance.taskID != tt.taskID || instance.terminationType != tt.wantType ||
instance.event != tt.wantEvent {
- t.Errorf("want %s %s %s %s, got %s %s %s %s",
- tt.wantID, tt.taskID, tt.wantType,
tt.wantEvent,
- instance.applicationID,
instance.taskID, instance.terminationType, instance.event)
- }
- })
- }
-}
-
-func TestReleaseAppAllocationAskEventGetEvent(t *testing.T) {
- tests := []struct {
- name string
- appID, taskID string
- terminationType si.TerminationType
- wantEvent ApplicationEventType
- }{
- {TestEventName, "testAppId001", "testTaskId001",
si.TerminationType_TIMEOUT, ReleaseAppAllocationAsk},
- }
-
- for _, tt := range tests {
- instance := NewReleaseAppAllocationAskEvent(tt.appID,
tt.terminationType, tt.taskID)
- event := instance.GetEvent()
- t.Run(tt.name, func(t *testing.T) {
- if event != tt.wantEvent.String() {
- t.Errorf("want %s, got %s", tt.wantEvent, event)
- }
- })
- }
-}
-
-func TestReleaseAppAllocationAskEventGetArgs(t *testing.T) {
- tests := []struct {
- name string
- appID, taskID string
- terminationType si.TerminationType
- wantLen int
- wantTaskID, wantType string
- castOk []bool
- wantArg []string
- }{
- {TestArgsName, "testAppId001", "testTaskId001",
si.TerminationType_TIMEOUT, 2, "testTaskId001", "TIMEOUT", []bool{true, true},
[]string{"testTaskId001", "TIMEOUT"}},
- }
-
- for _, tt := range tests {
- instance := NewReleaseAppAllocationAskEvent(tt.appID,
tt.terminationType, tt.taskID)
- args := instance.GetArgs()
- t.Run(tt.name, func(t *testing.T) {
- if len(args) != tt.wantLen {
- t.Errorf("want %d, got %d", tt.wantLen,
len(args))
-
- for index, arg := range args {
- info, ok := arg.(string)
- if ok != tt.castOk[index] {
- t.Errorf("want %v, got %v",
tt.castOk[index], ok)
- }
- if info != tt.wantArg[index] {
- t.Errorf("want %s, got %s",
tt.wantArg[index], info)
- }
- }
- }
- })
- }
-}
-
-func TestReleaseAppAllocationAskEventGetApplicationID(t *testing.T) {
- tests := []struct {
- name string
- appID, taskID string
- terminationType si.TerminationType
- wantID string
- }{
- {TestAppIDName, "testAppId001", "testTaskId001",
si.TerminationType_TIMEOUT, "testAppId001"},
- }
-
- for _, tt := range tests {
- instance := NewReleaseAppAllocationAskEvent(tt.appID,
tt.terminationType, tt.taskID)
- appID := instance.GetApplicationID()
- t.Run(tt.name, func(t *testing.T) {
- if appID != tt.wantID {
- t.Errorf("want %s, got %s", tt.wantID, appID)
- }
- })
- }
-}
-
func TestNewResumingApplicationEvent(t *testing.T) {
tests := []struct {
name string
@@ -902,7 +803,6 @@ func TestApplicationEventsAsString(t *testing.T) {
assert.Equal(t, KillApplication.String(), "KillApplication")
assert.Equal(t, KilledApplication.String(), "KilledApplication")
assert.Equal(t, ReleaseAppAllocation.String(), "ReleaseAppAllocation")
- assert.Equal(t, ReleaseAppAllocationAsk.String(),
"ReleaseAppAllocationAsk")
assert.Equal(t, ResumingApplication.String(), "ResumingApplication")
assert.Equal(t, AppTaskCompleted.String(), "AppTaskCompleted")
}
diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index 728212bf..0c43b34d 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -77,15 +77,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
}
}
- for _, reject := range response.Rejected {
- // request rejected by the scheduler, put it back and try
scheduling again
- log.Log(log.ShimRMCallback).Debug("callback: response to
rejected ask",
- zap.String("allocationKey", reject.AllocationKey))
- dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID,
reject.AllocationKey,
- fmt.Sprintf("task %s ask from application %s is
rejected by scheduler",
- reject.AllocationKey, reject.ApplicationID)))
- }
-
for _, reject := range response.RejectedAllocations {
// request rejected by the scheduler, reject it
log.Log(log.ShimRMCallback).Debug("callback: response to
rejected allocation",
@@ -110,16 +101,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response
*si.AllocationRespons
}
}
- for _, ask := range response.ReleasedAsks {
- log.Log(log.ShimRMCallback).Debug("callback: response to
released allocations",
- zap.String("allocation key", ask.AllocationKey))
-
- if ask.TerminationType == si.TerminationType_TIMEOUT {
- ev :=
NewReleaseAppAllocationAskEvent(ask.ApplicationID, ask.TerminationType,
ask.AllocationKey)
- dispatcher.Dispatch(ev)
- }
- }
-
return nil
}
diff --git a/pkg/cache/scheduler_callback_test.go
b/pkg/cache/scheduler_callback_test.go
index 59bd875e..d98f5f6f 100644
--- a/pkg/cache/scheduler_callback_test.go
+++ b/pkg/cache/scheduler_callback_test.go
@@ -133,27 +133,6 @@ func TestUpdateAllocation_NewTask_PodAlreadyAssigned(t
*testing.T) {
assert.Equal(t, TaskSchedAllocated, task.schedulingState)
}
-func TestUpdateAllocation_AskRejected(t *testing.T) {
- callback, context := initCallbackTest(t, false, false)
- defer dispatcher.UnregisterAllEventHandlers()
- defer dispatcher.Stop()
-
- err := callback.UpdateAllocation(&si.AllocationResponse{
- Rejected: []*si.RejectedAllocationAsk{
- {
- ApplicationID: appID,
- AllocationKey: taskUID1,
- },
- },
- })
- assert.NilError(t, err, "error updating allocation")
- task := context.getTask(appID, taskUID1)
- err = utils.WaitForCondition(func() bool {
- return task.GetTaskState() == TaskStates().Failed
- }, 10*time.Millisecond, time.Second)
- assert.NilError(t, err, "task has not transitioned to Failed state")
-}
-
func TestUpdateAllocation_AllocationRejected(t *testing.T) {
callback, context := initCallbackTest(t, false, false)
defer dispatcher.UnregisterAllEventHandlers()
@@ -239,33 +218,6 @@ func TestUpdateAllocation_AllocationReleased_StoppedByRM(t
*testing.T) {
assert.Error(t, err, "timeout waiting for condition") // pod is not
expected to be deleted
}
-func TestUpdateAllocation_AskReleased(t *testing.T) {
- callback, context := initCallbackTest(t, false, true)
- defer dispatcher.UnregisterAllEventHandlers()
- defer dispatcher.Stop()
- app := context.getApplication(appID)
- app.sm.SetState(ApplicationStates().Running)
- var deleteCalled atomic.Bool
- context.apiProvider.(*client.MockedAPIProvider).MockDeleteFn(func(pod
*v1.Pod) error { //nolint:errcheck
- deleteCalled.Store(true)
- return nil
- })
-
- err := callback.UpdateAllocation(&si.AllocationResponse{
- ReleasedAsks: []*si.AllocationAskRelease{
- {
- ApplicationID: appID,
- AllocationKey: taskUID1,
- TerminationType: si.TerminationType_TIMEOUT,
- },
- },
- })
- assert.NilError(t, err, "error updating allocation")
- assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1))
- err = utils.WaitForCondition(deleteCalled.Load, 10*time.Millisecond,
time.Second)
- assert.NilError(t, err, "pod has not been deleted")
-}
-
func TestUpdateApplication_Accepted(t *testing.T) {
callback, context := initCallbackTest(t, false, false)
defer dispatcher.UnregisterAllEventHandlers()
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 02b07d16..c7773965 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -292,40 +292,25 @@ func (task *Task) handleSubmitTaskEvent() {
AllowPreemptOther: task.isPreemptOtherAllowed(),
}
- if utils.PodAlreadyBound(task.pod) {
- // submit allocation
- rr := common.CreateAllocationForTask(
- task.applicationID,
- task.taskID,
- task.pod.Spec.NodeName,
- task.resource,
- task.placeholder,
- task.taskGroupName,
- task.pod,
- task.originator,
- preemptionPolicy)
- log.Log(log.ShimCacheTask).Debug("send update request",
zap.Stringer("request", rr))
- if err :=
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err !=
nil {
- log.Log(log.ShimCacheTask).Debug("failed to send
allocation to scheduler", zap.Error(err))
- return
- }
- } else {
- // submit allocation ask
- rr := common.CreateAllocationRequestForTask(
- task.applicationID,
- task.taskID,
- task.resource,
- task.placeholder,
- task.taskGroupName,
- task.pod,
- task.originator,
- preemptionPolicy)
- log.Log(log.ShimCacheTask).Debug("send update request",
zap.Stringer("request", rr))
- if err :=
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err !=
nil {
- log.Log(log.ShimCacheTask).Debug("failed to send
scheduling request to scheduler", zap.Error(err))
- return
- }
+ // submit allocation ask
+ rr := common.CreateAllocationForTask(
+ task.applicationID,
+ task.taskID,
+ task.pod.Spec.NodeName,
+ task.resource,
+ task.placeholder,
+ task.taskGroupName,
+ task.pod,
+ task.originator,
+ preemptionPolicy)
+ log.Log(log.ShimCacheTask).Debug("send update request",
zap.Stringer("request", rr))
+ if err :=
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err !=
nil {
+ log.Log(log.ShimCacheTask).Debug("failed to send scheduling
request to scheduler", zap.Error(err))
+ return
+ }
+ if !utils.PodAlreadyBound(task.pod) {
+ // if this is a new request, add events to pod
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "Scheduling", "Scheduling",
"%s is queued and waiting for allocation", task.alias)
// if this task belongs to a task group, that means the app has
gang scheduling enabled
@@ -516,7 +501,7 @@ func (task *Task) releaseAllocation() {
s := TaskStates()
switch task.GetTaskState() {
case s.New, s.Pending, s.Scheduling, s.Rejected:
- releaseRequest =
common.CreateReleaseRequestForTask(task.applicationID, task.taskID,
task.allocationKey, task.application.partition, task.terminationType)
+ releaseRequest =
common.CreateReleaseRequestForTask(task.applicationID, task.taskID,
task.application.partition, task.terminationType)
default:
if task.allocationKey == "" {
log.Log(log.ShimCacheTask).Warn("BUG: task
allocationKey is empty on release",
@@ -525,13 +510,11 @@ func (task *Task) releaseAllocation() {
zap.String("taskAlias", task.alias),
zap.String("task", task.GetTaskState()))
}
- releaseRequest = common.CreateReleaseRequestForTask(
- task.applicationID, task.taskID,
task.allocationKey, task.application.partition, task.terminationType)
+ releaseRequest =
common.CreateReleaseRequestForTask(task.applicationID, task.taskID,
task.application.partition, task.terminationType)
}
if releaseRequest.Releases != nil {
log.Log(log.ShimCacheTask).Info("releasing allocations",
- zap.Int("numOfAsksToRelease",
len(releaseRequest.Releases.AllocationAsksToRelease)),
zap.Int("numOfAllocationsToRelease",
len(releaseRequest.Releases.AllocationsToRelease)))
}
if err :=
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseRequest);
err != nil {
diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go
index 2e2faf8f..1ec5f7d8 100644
--- a/pkg/cache/task_test.go
+++ b/pkg/cache/task_test.go
@@ -224,11 +224,6 @@ func TestReleaseTaskAllocation(t *testing.T) {
assert.Equal(t,
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
assert.Equal(t,
request.Releases.AllocationsToRelease[0].PartitionName, "default")
assert.Equal(t,
request.Releases.AllocationsToRelease[0].AllocationKey, "task01")
- assert.Assert(t, request.Releases.AllocationAsksToRelease !=
nil)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].TerminationType,
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
return nil
})
@@ -244,12 +239,11 @@ func TestReleaseTaskAllocation(t *testing.T) {
task = NewTask("task01", app, mockedContext, pod)
mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
assert.Assert(t, request.Releases != nil)
- assert.Assert(t, request.Releases.AllocationsToRelease == nil)
- assert.Assert(t, request.Releases.AllocationAsksToRelease !=
nil)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].TerminationType,
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+ assert.Assert(t, request.Releases.AllocationsToRelease != nil)
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].AllocationKey, "task01")
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].PartitionName, "default")
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].TerminationType,
si.TerminationType_STOPPED_BY_RM)
return nil
})
err = task.handle(NewFailTaskEvent(app.applicationID, "task01", "test
failure"))
@@ -265,11 +259,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
assert.Equal(t,
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
assert.Equal(t,
request.Releases.AllocationsToRelease[0].PartitionName, "default")
assert.Equal(t,
request.Releases.AllocationsToRelease[0].AllocationKey, "task01")
- assert.Assert(t, request.Releases.AllocationAsksToRelease !=
nil)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].TerminationType,
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].TerminationType,
si.TerminationType_STOPPED_BY_RM)
return nil
})
err = task.handle(NewFailTaskEvent(app.applicationID, "task01", "test
failure"))
@@ -330,11 +320,10 @@ func TestReleaseTaskAsk(t *testing.T) {
// this is to verify we are sending correct info to the scheduler core
mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
assert.Assert(t, request.Releases != nil)
- assert.Assert(t, request.Releases.AllocationsToRelease == nil)
- assert.Assert(t, request.Releases.AllocationAsksToRelease !=
nil)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, task.taskID)
+ assert.Assert(t, request.Releases.AllocationsToRelease != nil)
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].PartitionName, "default")
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].AllocationKey, task.taskID)
return nil
})
@@ -598,11 +587,11 @@ func TestHandleSubmitTaskEvent(t *testing.T) {
assert.Equal(t, task1.GetTaskState(), TaskStates().Scheduling)
assert.Equal(t, rt.time, int64(1))
assert.Assert(t, allocRequest != nil)
- assert.Equal(t, len(allocRequest.Asks), 1)
- assert.Equal(t, allocRequest.Asks[0].Priority, int32(1000))
- assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy != nil)
- assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy.AllowPreemptSelf)
- assert.Assert(t,
!allocRequest.Asks[0].PreemptionPolicy.AllowPreemptOther)
+ assert.Equal(t, len(allocRequest.Allocations), 1)
+ assert.Equal(t, allocRequest.Allocations[0].Priority, int32(1000))
+ assert.Assert(t, allocRequest.Allocations[0].PreemptionPolicy != nil)
+ assert.Assert(t,
allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptSelf)
+ assert.Assert(t,
!allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptOther)
allocRequest = nil
rt.time = 0
// pod with taskGroup name
@@ -612,11 +601,11 @@ func TestHandleSubmitTaskEvent(t *testing.T) {
assert.Equal(t, task2.GetTaskState(), TaskStates().Scheduling)
assert.Equal(t, rt.time, int64(2))
assert.Assert(t, allocRequest != nil)
- assert.Equal(t, len(allocRequest.Asks), 1)
- assert.Equal(t, allocRequest.Asks[0].Priority, int32(1001))
- assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy != nil)
- assert.Assert(t,
!allocRequest.Asks[0].PreemptionPolicy.AllowPreemptSelf)
- assert.Assert(t,
allocRequest.Asks[0].PreemptionPolicy.AllowPreemptOther)
+ assert.Equal(t, len(allocRequest.Allocations), 1)
+ assert.Equal(t, allocRequest.Allocations[0].Priority, int32(1001))
+ assert.Assert(t, allocRequest.Allocations[0].PreemptionPolicy != nil)
+ assert.Assert(t,
!allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptSelf)
+ assert.Assert(t,
allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptOther)
// Test over, set Recorder back fake type
events.SetRecorder(k8sEvents.NewFakeRecorder(1024))
@@ -665,11 +654,9 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T)
{
// because the task is in Scheduling state,
// here we expect to trigger a UpdateRequest that contains a
releaseAllocationAsk request
mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
- assert.Equal(t, len(request.Releases.AllocationAsksToRelease),
1,
- "allocationAskToRelease is not in the expected length")
- assert.Equal(t, len(request.Releases.AllocationsToRelease), 0,
+ assert.Equal(t, len(request.Releases.AllocationsToRelease), 1,
"allocationsToRelease is not in the expected length")
- askToRelease := request.Releases.AllocationAsksToRelease[0]
+ askToRelease := request.Releases.AllocationsToRelease[0]
assert.Equal(t, askToRelease.ApplicationID, appID)
assert.Equal(t, askToRelease.AllocationKey, podUID)
return nil
@@ -689,8 +676,6 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
PartitionName: "default",
}
mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request
*si.AllocationRequest) error {
- assert.Equal(t, len(request.Releases.AllocationAsksToRelease),
1,
- "allocationAskToRelease is not in the expected length")
assert.Equal(t, len(request.Releases.AllocationsToRelease), 1,
"allocationsToRelease is not in the expected length")
allocToRelease := request.Releases.AllocationsToRelease[0]
diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go
index c3c1a779..82a22530 100644
--- a/pkg/common/si_helper.go
+++ b/pkg/common/si_helper.go
@@ -73,11 +73,11 @@ func CreatePriorityForTask(pod *v1.Pod) int32 {
}
func CreateAllocationRequestForTask(appID, taskID string, resource
*si.Resource, placeholder bool, taskGroupName string, pod *v1.Pod, originator
bool, preemptionPolicy *si.PreemptionPolicy) *si.AllocationRequest {
- ask := si.AllocationAsk{
+ ask := si.Allocation{
AllocationKey: taskID,
- ResourceAsk: resource,
+ ResourcePerAlloc: resource,
ApplicationID: appID,
- Tags: CreateTagsForTask(pod),
+ AllocationTags: CreateTagsForTask(pod),
Placeholder: placeholder,
TaskGroupName: taskGroupName,
Originator: originator,
@@ -86,8 +86,8 @@ func CreateAllocationRequestForTask(appID, taskID string,
resource *si.Resource,
}
return &si.AllocationRequest{
- Asks: []*si.AllocationAsk{&ask},
- RmID: conf.GetSchedulerConf().ClusterID,
+ Allocations: []*si.Allocation{&ask},
+ RmID: conf.GetSchedulerConf().ClusterID,
}
}
@@ -121,30 +121,18 @@ func GetTerminationTypeFromString(terminationTypeStr
string) si.TerminationType
return si.TerminationType_STOPPED_BY_RM
}
-func CreateReleaseRequestForTask(appID, taskID, allocationKey, partition,
terminationType string) *si.AllocationRequest {
- var allocToRelease []*si.AllocationRelease
- if allocationKey != "" {
- allocToRelease = make([]*si.AllocationRelease, 1)
- allocToRelease[0] = &si.AllocationRelease{
- ApplicationID: appID,
- AllocationKey: allocationKey,
- PartitionName: partition,
- TerminationType:
GetTerminationTypeFromString(terminationType),
- Message: "task completed",
- }
- }
-
- askToRelease := make([]*si.AllocationAskRelease, 1)
- askToRelease[0] = &si.AllocationAskRelease{
- ApplicationID: appID,
- AllocationKey: taskID,
- PartitionName: partition,
- Message: "task request completed",
+func CreateReleaseRequestForTask(appID, taskID, partition, terminationType
string) *si.AllocationRequest {
+ allocToRelease := make([]*si.AllocationRelease, 1)
+ allocToRelease[0] = &si.AllocationRelease{
+ ApplicationID: appID,
+ AllocationKey: taskID,
+ PartitionName: partition,
+ TerminationType: GetTerminationTypeFromString(terminationType),
+ Message: "task completed",
}
releaseRequest := si.AllocationReleasesRequest{
- AllocationsToRelease: allocToRelease,
- AllocationAsksToRelease: askToRelease,
+ AllocationsToRelease: allocToRelease,
}
return &si.AllocationRequest{
diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go
index 92f5e240..b9464bd8 100644
--- a/pkg/common/si_helper_test.go
+++ b/pkg/common/si_helper_test.go
@@ -32,45 +32,23 @@ const nodeID = "node-01"
func TestCreateReleaseRequestForTask(t *testing.T) {
// with allocationKey
- request := CreateReleaseRequestForTask("app01", "task01", "task01",
"default", "STOPPED_BY_RM")
+ request := CreateReleaseRequestForTask("app01", "task01", "default",
"STOPPED_BY_RM")
assert.Assert(t, request.Releases != nil)
assert.Assert(t, request.Releases.AllocationsToRelease != nil)
- assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
assert.Equal(t, len(request.Releases.AllocationsToRelease), 1)
- assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID,
"app01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey,
"task01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName,
"default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].TerminationType,
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].TerminationType,
si.TerminationType_STOPPED_BY_RM)
- // without allocationKey
- request = CreateReleaseRequestForTask("app01", "task01", "", "default",
"STOPPED_BY_RM")
- assert.Assert(t, request.Releases != nil)
- assert.Assert(t, request.Releases.AllocationsToRelease == nil)
- assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
- assert.Equal(t, len(request.Releases.AllocationsToRelease), 0)
- assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].TerminationType,
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
-
- request = CreateReleaseRequestForTask("app01", "task01", "task01",
"default", "UNKNOWN")
+ request = CreateReleaseRequestForTask("app01", "task01", "default",
"UNKNOWN_TERMINATION_TYPE")
assert.Assert(t, request.Releases != nil)
assert.Assert(t, request.Releases.AllocationsToRelease != nil)
- assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
assert.Equal(t, len(request.Releases.AllocationsToRelease), 1)
- assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID,
"app01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey,
"task01")
assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName,
"default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
- assert.Equal(t,
request.Releases.AllocationAsksToRelease[0].TerminationType,
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+ assert.Equal(t,
request.Releases.AllocationsToRelease[0].TerminationType,
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
}
func TestCreateUpdateRequestForRemoveApplication(t *testing.T) {
@@ -112,14 +90,14 @@ func TestCreateUpdateRequestForTask(t *testing.T) {
}
updateRequest := CreateAllocationRequestForTask("appId1", "taskId1",
res, false, "", pod, false, preemptionPolicy)
- asks := updateRequest.Asks
+ asks := updateRequest.Allocations
assert.Equal(t, len(asks), 1)
allocAsk := asks[0]
assert.Assert(t, allocAsk != nil)
assert.Assert(t, allocAsk.PreemptionPolicy != nil)
assert.Equal(t, allocAsk.PreemptionPolicy.AllowPreemptSelf, true)
assert.Equal(t, allocAsk.PreemptionPolicy.AllowPreemptOther, true)
- tags := allocAsk.Tags
+ tags := allocAsk.AllocationTags
assert.Assert(t, tags != nil)
assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"podName"],
podName)
assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"namespace"],
namespace)
@@ -287,7 +265,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) {
}
updateRequest := CreateAllocationRequestForTask("appId1", "taskId1",
res, false, "", pod, false, preemptionPolicy)
- asks := updateRequest.Asks
+ asks := updateRequest.Allocations
assert.Equal(t, len(asks), 1)
allocAsk := asks[0]
if allocAsk == nil {
@@ -320,7 +298,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) {
}
updateRequest1 := CreateAllocationRequestForTask("appId1", "taskId1",
res, false, "", pod1, false, preemptionPolicy1)
- asks1 := updateRequest1.Asks
+ asks1 := updateRequest1.Allocations
assert.Equal(t, len(asks1), 1)
allocAsk1 := asks1[0]
if allocAsk1 == nil {
@@ -329,7 +307,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) {
assert.Assert(t, allocAsk1.PreemptionPolicy != nil)
assert.Equal(t, allocAsk1.PreemptionPolicy.AllowPreemptSelf, true)
assert.Equal(t, allocAsk1.PreemptionPolicy.AllowPreemptOther, false)
- tags := allocAsk1.Tags
+ tags := allocAsk1.AllocationTags
assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"podName"],
podName1)
assert.Equal(t, allocAsk1.Priority, int32(100))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]