This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 2278b321 [YUNIKORN-2779] Shim: Use UpdateAllocation for both asks and 
allocations (#891)
2278b321 is described below

commit 2278b3217c702ccb796e4d623bc7837625e5a4ec
Author: Craig Condit <[email protected]>
AuthorDate: Fri Aug 16 09:55:52 2024 -0500

    [YUNIKORN-2779] Shim: Use UpdateAllocation for both asks and allocations 
(#891)
    
    Closes: #891
---
 go.mod                               |   4 +-
 go.sum                               |   8 +--
 pkg/cache/application.go             |  35 +++---------
 pkg/cache/application_state.go       |  64 ++--------------------
 pkg/cache/application_state_test.go  | 100 -----------------------------------
 pkg/cache/scheduler_callback.go      |  19 -------
 pkg/cache/scheduler_callback_test.go |  48 -----------------
 pkg/cache/task.go                    |  57 +++++++-------------
 pkg/cache/task_test.go               |  59 ++++++++-------------
 pkg/common/si_helper.go              |  40 +++++---------
 pkg/common/si_helper_test.go         |  40 ++++----------
 11 files changed, 80 insertions(+), 394 deletions(-)

diff --git a/go.mod b/go.mod
index 349820d3..685499e7 100644
--- a/go.mod
+++ b/go.mod
@@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
 go 1.21
 
 require (
-       github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9
-       github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240731203810-92032b13d586
+       github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2
+       github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240815142741-38a38685cd4e
        github.com/google/go-cmp v0.6.0
        github.com/google/uuid v1.6.0
        github.com/looplab/fsm v1.0.1
diff --git a/go.sum b/go.sum
index 7408b97b..0c702f23 100644
--- a/go.sum
+++ b/go.sum
@@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
 github.com/NYTimes/gziphandler v1.1.1/go.mod 
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
 github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df 
h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
 github.com/antlr/antlr4/runtime/Go/antlr/v4 
v4.0.0-20230305170008-8188dc5388df/go.mod 
h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
-github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 
h1:s1Co/K+cR9Q/GW0e974dToW9eyLQZxYoCp0TCoEuEj0=
-github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9/go.mod 
h1:S9yGBGA2i2hAajtEc2t4lmiPJDZz3Ek8eVxz5KhJqGI=
-github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240731203810-92032b13d586 
h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
-github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240731203810-92032b13d586/go.mod 
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
+github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2 
h1:m1kxL2ce3QfHOsYl5D+AfHn7xjFxP40b88na/7qzmS8=
+github.com/apache/yunikorn-core v0.0.0-20240815214512-f51aaba68ff2/go.mod 
h1:QHKfJ2RyZuQnZg28SnypmnvFxN/zfoYf+hmfxiVdq5g=
+github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240815142741-38a38685cd4e 
h1:ZOLst6ROwUrgoUQbEdYaz28iKuiU5YNYGtelKsTFhqw=
+github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240815142741-38a38685cd4e/go.mod 
h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod 
h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
 github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a 
h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index a6ec1f3f..085465ce 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -627,42 +627,19 @@ func (app *Application) handleFailApplicationEvent(errMsg 
string) {
        }
 }
 
-func (app *Application) handleReleaseAppAllocationEvent(allocationKey string, 
terminationType string) {
-       log.Log(log.ShimCacheApplication).Info("try to release pod from 
application",
-               zap.String("appID", app.applicationID),
-               zap.String("allocationKey", allocationKey),
-               zap.String("terminationType", terminationType))
-
-       for _, task := range app.taskMap {
-               if task.allocationKey == allocationKey {
-                       task.setTaskTerminationType(terminationType)
-                       err := task.DeleteTaskPod()
-                       if err != nil {
-                               log.Log(log.ShimCacheApplication).Error("failed 
to release allocation from application", zap.Error(err))
-                       }
-                       app.publishPlaceholderTimeoutEvents(task)
-               }
-       }
-}
-
-func (app *Application) handleReleaseAppAllocationAskEvent(taskID string, 
terminationType string) {
+func (app *Application) handleReleaseAppAllocationEvent(taskID string, 
terminationType string) {
        log.Log(log.ShimCacheApplication).Info("try to release pod from 
application",
                zap.String("appID", app.applicationID),
                zap.String("taskID", taskID),
                zap.String("terminationType", terminationType))
+
        if task, ok := app.taskMap[taskID]; ok {
                task.setTaskTerminationType(terminationType)
-               if task.IsPlaceholder() {
-                       err := task.DeleteTaskPod()
-                       if err != nil {
-                               log.Log(log.ShimCacheApplication).Error("failed 
to release allocation ask from application", zap.Error(err))
-                       }
-                       app.publishPlaceholderTimeoutEvents(task)
-               } else {
-                       log.Log(log.ShimCacheApplication).Warn("skip to release 
allocation ask, ask is not a placeholder",
-                               zap.String("appID", app.applicationID),
-                               zap.String("taskID", taskID))
+               err := task.DeleteTaskPod()
+               if err != nil {
+                       log.Log(log.ShimCacheApplication).Error("failed to 
release allocation from application", zap.Error(err))
                }
+               app.publishPlaceholderTimeoutEvents(task)
        } else {
                log.Log(log.ShimCacheApplication).Warn("task not found",
                        zap.String("appID", app.applicationID),
diff --git a/pkg/cache/application_state.go b/pkg/cache/application_state.go
index aee1c3e1..59541b24 100644
--- a/pkg/cache/application_state.go
+++ b/pkg/cache/application_state.go
@@ -49,13 +49,12 @@ const (
        KillApplication
        KilledApplication
        ReleaseAppAllocation
-       ReleaseAppAllocationAsk
        ResumingApplication
        AppTaskCompleted
 )
 
 func (ae ApplicationEventType) String() string {
-       return [...]string{"SubmitApplication", "AcceptApplication", 
"TryReserve", "UpdateReservation", "RunApplication", "RejectApplication", 
"CompleteApplication", "FailApplication", "KillApplication", 
"KilledApplication", "ReleaseAppAllocation", "ReleaseAppAllocationAsk", 
"ResumingApplication", "AppTaskCompleted"}[ae]
+       return [...]string{"SubmitApplication", "AcceptApplication", 
"TryReserve", "UpdateReservation", "RunApplication", "RejectApplication", 
"CompleteApplication", "FailApplication", "KillApplication", 
"KilledApplication", "ReleaseAppAllocation", "ResumingApplication", 
"AppTaskCompleted"}[ae]
 }
 
 // ------------------------
@@ -295,37 +294,6 @@ func (re ReleaseAppAllocationEvent) GetEvent() string {
        return re.event.String()
 }
 
-type ReleaseAppAllocationAskEvent struct {
-       applicationID   string
-       taskID          string
-       terminationType string
-       event           ApplicationEventType
-}
-
-func NewReleaseAppAllocationAskEvent(appID string, allocTermination 
si.TerminationType, taskID string) ReleaseAppAllocationAskEvent {
-       return ReleaseAppAllocationAskEvent{
-               applicationID:   appID,
-               taskID:          taskID,
-               terminationType: 
si.TerminationType_name[int32(allocTermination)],
-               event:           ReleaseAppAllocationAsk,
-       }
-}
-
-func (re ReleaseAppAllocationAskEvent) GetApplicationID() string {
-       return re.applicationID
-}
-
-func (re ReleaseAppAllocationAskEvent) GetArgs() []interface{} {
-       args := make([]interface{}, 2)
-       args[0] = re.taskID
-       args[1] = re.terminationType
-       return args
-}
-
-func (re ReleaseAppAllocationAskEvent) GetEvent() string {
-       return re.event.String()
-}
-
 // ------------------------
 // Resuming application
 // ------------------------
@@ -434,7 +402,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
                        },
                        {
                                Name: ReleaseAppAllocation.String(),
-                               Src:  []string{states.Running},
+                               Src:  []string{states.Running, states.Accepted, 
states.Reserving},
                                Dst:  states.Running,
                        },
                        {
@@ -447,21 +415,6 @@ func newAppState() *fsm.FSM { //nolint:funlen
                                Src:  []string{states.Resuming},
                                Dst:  states.Resuming,
                        },
-                       {
-                               Name: ReleaseAppAllocationAsk.String(),
-                               Src:  []string{states.Running, states.Accepted, 
states.Reserving},
-                               Dst:  states.Running,
-                       },
-                       {
-                               Name: ReleaseAppAllocationAsk.String(),
-                               Src:  []string{states.Failing},
-                               Dst:  states.Failing,
-                       },
-                       {
-                               Name: ReleaseAppAllocationAsk.String(),
-                               Src:  []string{states.Resuming},
-                               Dst:  states.Resuming,
-                       },
                        {
                                Name: CompleteApplication.String(),
                                Src:  []string{states.Running},
@@ -543,17 +496,6 @@ func newAppState() *fsm.FSM { //nolint:funlen
                                app.onReservationStateChange()
                        },
                        ReleaseAppAllocation.String(): func(_ context.Context, 
event *fsm.Event) {
-                               app := event.Args[0].(*Application) 
//nolint:errcheck
-                               eventArgs := make([]string, 2)
-                               if err := 
events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != 
nil {
-                                       log.Log(log.ShimFSM).Error("fail to 
parse event arg", zap.Error(err))
-                                       return
-                               }
-                               allocationKey := eventArgs[0]
-                               terminationType := eventArgs[1]
-                               
app.handleReleaseAppAllocationEvent(allocationKey, terminationType)
-                       },
-                       ReleaseAppAllocationAsk.String(): func(_ 
context.Context, event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
                                eventArgs := make([]string, 2)
                                if err := 
events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != 
nil {
@@ -562,7 +504,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
                                }
                                taskID := eventArgs[0]
                                terminationType := eventArgs[1]
-                               app.handleReleaseAppAllocationAskEvent(taskID, 
terminationType)
+                               app.handleReleaseAppAllocationEvent(taskID, 
terminationType)
                        },
                        AppTaskCompleted.String(): func(_ context.Context, 
event *fsm.Event) {
                                app := event.Args[0].(*Application) 
//nolint:errcheck
diff --git a/pkg/cache/application_state_test.go 
b/pkg/cache/application_state_test.go
index d16e3012..75626bd9 100644
--- a/pkg/cache/application_state_test.go
+++ b/pkg/cache/application_state_test.go
@@ -769,105 +769,6 @@ func TestReleaseAppAllocationEventGetApplicationID(t 
*testing.T) {
        }
 }
 
-func TestNewReleaseAppAllocationAskEvent(t *testing.T) {
-       tests := []struct {
-               name                         string
-               appID, taskID                string
-               terminationType              si.TerminationType
-               wantID, wantTaskID, wantType string
-               wantEvent                    ApplicationEventType
-       }{
-               {TestCreateName, "testAppId001", "testTaskId001", 
si.TerminationType_TIMEOUT, "testAppId001", "testTaskId001", "TIMEOUT", 
ReleaseAppAllocationAsk},
-       }
-
-       for _, tt := range tests {
-               instance := NewReleaseAppAllocationAskEvent(tt.appID, 
tt.terminationType, tt.taskID)
-               t.Run(tt.name, func(t *testing.T) {
-                       if instance.applicationID != tt.wantID || 
instance.taskID != tt.taskID || instance.terminationType != tt.wantType || 
instance.event != tt.wantEvent {
-                               t.Errorf("want %s %s %s %s, got %s %s %s %s",
-                                       tt.wantID, tt.taskID, tt.wantType, 
tt.wantEvent,
-                                       instance.applicationID, 
instance.taskID, instance.terminationType, instance.event)
-                       }
-               })
-       }
-}
-
-func TestReleaseAppAllocationAskEventGetEvent(t *testing.T) {
-       tests := []struct {
-               name            string
-               appID, taskID   string
-               terminationType si.TerminationType
-               wantEvent       ApplicationEventType
-       }{
-               {TestEventName, "testAppId001", "testTaskId001", 
si.TerminationType_TIMEOUT, ReleaseAppAllocationAsk},
-       }
-
-       for _, tt := range tests {
-               instance := NewReleaseAppAllocationAskEvent(tt.appID, 
tt.terminationType, tt.taskID)
-               event := instance.GetEvent()
-               t.Run(tt.name, func(t *testing.T) {
-                       if event != tt.wantEvent.String() {
-                               t.Errorf("want %s, got %s", tt.wantEvent, event)
-                       }
-               })
-       }
-}
-
-func TestReleaseAppAllocationAskEventGetArgs(t *testing.T) {
-       tests := []struct {
-               name                 string
-               appID, taskID        string
-               terminationType      si.TerminationType
-               wantLen              int
-               wantTaskID, wantType string
-               castOk               []bool
-               wantArg              []string
-       }{
-               {TestArgsName, "testAppId001", "testTaskId001", 
si.TerminationType_TIMEOUT, 2, "testTaskId001", "TIMEOUT", []bool{true, true}, 
[]string{"testTaskId001", "TIMEOUT"}},
-       }
-
-       for _, tt := range tests {
-               instance := NewReleaseAppAllocationAskEvent(tt.appID, 
tt.terminationType, tt.taskID)
-               args := instance.GetArgs()
-               t.Run(tt.name, func(t *testing.T) {
-                       if len(args) != tt.wantLen {
-                               t.Errorf("want %d, got %d", tt.wantLen, 
len(args))
-
-                               for index, arg := range args {
-                                       info, ok := arg.(string)
-                                       if ok != tt.castOk[index] {
-                                               t.Errorf("want %v, got %v", 
tt.castOk[index], ok)
-                                       }
-                                       if info != tt.wantArg[index] {
-                                               t.Errorf("want %s, got %s", 
tt.wantArg[index], info)
-                                       }
-                               }
-                       }
-               })
-       }
-}
-
-func TestReleaseAppAllocationAskEventGetApplicationID(t *testing.T) {
-       tests := []struct {
-               name            string
-               appID, taskID   string
-               terminationType si.TerminationType
-               wantID          string
-       }{
-               {TestAppIDName, "testAppId001", "testTaskId001", 
si.TerminationType_TIMEOUT, "testAppId001"},
-       }
-
-       for _, tt := range tests {
-               instance := NewReleaseAppAllocationAskEvent(tt.appID, 
tt.terminationType, tt.taskID)
-               appID := instance.GetApplicationID()
-               t.Run(tt.name, func(t *testing.T) {
-                       if appID != tt.wantID {
-                               t.Errorf("want %s, got %s", tt.wantID, appID)
-                       }
-               })
-       }
-}
-
 func TestNewResumingApplicationEvent(t *testing.T) {
        tests := []struct {
                name      string
@@ -902,7 +803,6 @@ func TestApplicationEventsAsString(t *testing.T) {
        assert.Equal(t, KillApplication.String(), "KillApplication")
        assert.Equal(t, KilledApplication.String(), "KilledApplication")
        assert.Equal(t, ReleaseAppAllocation.String(), "ReleaseAppAllocation")
-       assert.Equal(t, ReleaseAppAllocationAsk.String(), 
"ReleaseAppAllocationAsk")
        assert.Equal(t, ResumingApplication.String(), "ResumingApplication")
        assert.Equal(t, AppTaskCompleted.String(), "AppTaskCompleted")
 }
diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go
index 728212bf..0c43b34d 100644
--- a/pkg/cache/scheduler_callback.go
+++ b/pkg/cache/scheduler_callback.go
@@ -77,15 +77,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response 
*si.AllocationRespons
                }
        }
 
-       for _, reject := range response.Rejected {
-               // request rejected by the scheduler, put it back and try 
scheduling again
-               log.Log(log.ShimRMCallback).Debug("callback: response to 
rejected ask",
-                       zap.String("allocationKey", reject.AllocationKey))
-               dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID, 
reject.AllocationKey,
-                       fmt.Sprintf("task %s ask from application %s is 
rejected by scheduler",
-                               reject.AllocationKey, reject.ApplicationID)))
-       }
-
        for _, reject := range response.RejectedAllocations {
                // request rejected by the scheduler, reject it
                log.Log(log.ShimRMCallback).Debug("callback: response to 
rejected allocation",
@@ -110,16 +101,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response 
*si.AllocationRespons
                }
        }
 
-       for _, ask := range response.ReleasedAsks {
-               log.Log(log.ShimRMCallback).Debug("callback: response to 
released allocations",
-                       zap.String("allocation key", ask.AllocationKey))
-
-               if ask.TerminationType == si.TerminationType_TIMEOUT {
-                       ev := 
NewReleaseAppAllocationAskEvent(ask.ApplicationID, ask.TerminationType, 
ask.AllocationKey)
-                       dispatcher.Dispatch(ev)
-               }
-       }
-
        return nil
 }
 
diff --git a/pkg/cache/scheduler_callback_test.go 
b/pkg/cache/scheduler_callback_test.go
index 59bd875e..d98f5f6f 100644
--- a/pkg/cache/scheduler_callback_test.go
+++ b/pkg/cache/scheduler_callback_test.go
@@ -133,27 +133,6 @@ func TestUpdateAllocation_NewTask_PodAlreadyAssigned(t 
*testing.T) {
        assert.Equal(t, TaskSchedAllocated, task.schedulingState)
 }
 
-func TestUpdateAllocation_AskRejected(t *testing.T) {
-       callback, context := initCallbackTest(t, false, false)
-       defer dispatcher.UnregisterAllEventHandlers()
-       defer dispatcher.Stop()
-
-       err := callback.UpdateAllocation(&si.AllocationResponse{
-               Rejected: []*si.RejectedAllocationAsk{
-                       {
-                               ApplicationID: appID,
-                               AllocationKey: taskUID1,
-                       },
-               },
-       })
-       assert.NilError(t, err, "error updating allocation")
-       task := context.getTask(appID, taskUID1)
-       err = utils.WaitForCondition(func() bool {
-               return task.GetTaskState() == TaskStates().Failed
-       }, 10*time.Millisecond, time.Second)
-       assert.NilError(t, err, "task has not transitioned to Failed state")
-}
-
 func TestUpdateAllocation_AllocationRejected(t *testing.T) {
        callback, context := initCallbackTest(t, false, false)
        defer dispatcher.UnregisterAllEventHandlers()
@@ -239,33 +218,6 @@ func TestUpdateAllocation_AllocationReleased_StoppedByRM(t 
*testing.T) {
        assert.Error(t, err, "timeout waiting for condition") // pod is not 
expected to be deleted
 }
 
-func TestUpdateAllocation_AskReleased(t *testing.T) {
-       callback, context := initCallbackTest(t, false, true)
-       defer dispatcher.UnregisterAllEventHandlers()
-       defer dispatcher.Stop()
-       app := context.getApplication(appID)
-       app.sm.SetState(ApplicationStates().Running)
-       var deleteCalled atomic.Bool
-       context.apiProvider.(*client.MockedAPIProvider).MockDeleteFn(func(pod 
*v1.Pod) error { //nolint:errcheck
-               deleteCalled.Store(true)
-               return nil
-       })
-
-       err := callback.UpdateAllocation(&si.AllocationResponse{
-               ReleasedAsks: []*si.AllocationAskRelease{
-                       {
-                               ApplicationID:   appID,
-                               AllocationKey:   taskUID1,
-                               TerminationType: si.TerminationType_TIMEOUT,
-                       },
-               },
-       })
-       assert.NilError(t, err, "error updating allocation")
-       assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1))
-       err = utils.WaitForCondition(deleteCalled.Load, 10*time.Millisecond, 
time.Second)
-       assert.NilError(t, err, "pod has not been deleted")
-}
-
 func TestUpdateApplication_Accepted(t *testing.T) {
        callback, context := initCallbackTest(t, false, false)
        defer dispatcher.UnregisterAllEventHandlers()
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 02b07d16..c7773965 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -292,40 +292,25 @@ func (task *Task) handleSubmitTaskEvent() {
                AllowPreemptOther: task.isPreemptOtherAllowed(),
        }
 
-       if utils.PodAlreadyBound(task.pod) {
-               // submit allocation
-               rr := common.CreateAllocationForTask(
-                       task.applicationID,
-                       task.taskID,
-                       task.pod.Spec.NodeName,
-                       task.resource,
-                       task.placeholder,
-                       task.taskGroupName,
-                       task.pod,
-                       task.originator,
-                       preemptionPolicy)
-               log.Log(log.ShimCacheTask).Debug("send update request", 
zap.Stringer("request", rr))
-               if err := 
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != 
nil {
-                       log.Log(log.ShimCacheTask).Debug("failed to send 
allocation to scheduler", zap.Error(err))
-                       return
-               }
-       } else {
-               // submit allocation ask
-               rr := common.CreateAllocationRequestForTask(
-                       task.applicationID,
-                       task.taskID,
-                       task.resource,
-                       task.placeholder,
-                       task.taskGroupName,
-                       task.pod,
-                       task.originator,
-                       preemptionPolicy)
-               log.Log(log.ShimCacheTask).Debug("send update request", 
zap.Stringer("request", rr))
-               if err := 
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != 
nil {
-                       log.Log(log.ShimCacheTask).Debug("failed to send 
scheduling request to scheduler", zap.Error(err))
-                       return
-               }
+       // submit allocation ask
+       rr := common.CreateAllocationForTask(
+               task.applicationID,
+               task.taskID,
+               task.pod.Spec.NodeName,
+               task.resource,
+               task.placeholder,
+               task.taskGroupName,
+               task.pod,
+               task.originator,
+               preemptionPolicy)
+       log.Log(log.ShimCacheTask).Debug("send update request", 
zap.Stringer("request", rr))
+       if err := 
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != 
nil {
+               log.Log(log.ShimCacheTask).Debug("failed to send scheduling 
request to scheduler", zap.Error(err))
+               return
+       }
 
+       if !utils.PodAlreadyBound(task.pod) {
+               // if this is a new request, add events to pod
                events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, 
v1.EventTypeNormal, "Scheduling", "Scheduling",
                        "%s is queued and waiting for allocation", task.alias)
                // if this task belongs to a task group, that means the app has 
gang scheduling enabled
@@ -516,7 +501,7 @@ func (task *Task) releaseAllocation() {
                s := TaskStates()
                switch task.GetTaskState() {
                case s.New, s.Pending, s.Scheduling, s.Rejected:
-                       releaseRequest = 
common.CreateReleaseRequestForTask(task.applicationID, task.taskID, 
task.allocationKey, task.application.partition, task.terminationType)
+                       releaseRequest = 
common.CreateReleaseRequestForTask(task.applicationID, task.taskID, 
task.application.partition, task.terminationType)
                default:
                        if task.allocationKey == "" {
                                log.Log(log.ShimCacheTask).Warn("BUG: task 
allocationKey is empty on release",
@@ -525,13 +510,11 @@ func (task *Task) releaseAllocation() {
                                        zap.String("taskAlias", task.alias),
                                        zap.String("task", task.GetTaskState()))
                        }
-                       releaseRequest = common.CreateReleaseRequestForTask(
-                               task.applicationID, task.taskID, 
task.allocationKey, task.application.partition, task.terminationType)
+                       releaseRequest = 
common.CreateReleaseRequestForTask(task.applicationID, task.taskID, 
task.application.partition, task.terminationType)
                }
 
                if releaseRequest.Releases != nil {
                        log.Log(log.ShimCacheTask).Info("releasing allocations",
-                               zap.Int("numOfAsksToRelease", 
len(releaseRequest.Releases.AllocationAsksToRelease)),
                                zap.Int("numOfAllocationsToRelease", 
len(releaseRequest.Releases.AllocationsToRelease)))
                }
                if err := 
task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseRequest);
 err != nil {
diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go
index 2e2faf8f..1ec5f7d8 100644
--- a/pkg/cache/task_test.go
+++ b/pkg/cache/task_test.go
@@ -224,11 +224,6 @@ func TestReleaseTaskAllocation(t *testing.T) {
                assert.Equal(t, 
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
                assert.Equal(t, 
request.Releases.AllocationsToRelease[0].PartitionName, "default")
                assert.Equal(t, 
request.Releases.AllocationsToRelease[0].AllocationKey, "task01")
-               assert.Assert(t, request.Releases.AllocationAsksToRelease != 
nil)
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].TerminationType, 
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
                return nil
        })
 
@@ -244,12 +239,11 @@ func TestReleaseTaskAllocation(t *testing.T) {
        task = NewTask("task01", app, mockedContext, pod)
        mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request 
*si.AllocationRequest) error {
                assert.Assert(t, request.Releases != nil)
-               assert.Assert(t, request.Releases.AllocationsToRelease == nil)
-               assert.Assert(t, request.Releases.AllocationAsksToRelease != 
nil)
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].TerminationType, 
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+               assert.Assert(t, request.Releases.AllocationsToRelease != nil)
+               assert.Equal(t, 
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
+               assert.Equal(t, 
request.Releases.AllocationsToRelease[0].AllocationKey, "task01")
+               assert.Equal(t, 
request.Releases.AllocationsToRelease[0].PartitionName, "default")
+               assert.Equal(t, 
request.Releases.AllocationsToRelease[0].TerminationType, 
si.TerminationType_STOPPED_BY_RM)
                return nil
        })
        err = task.handle(NewFailTaskEvent(app.applicationID, "task01", "test 
failure"))
@@ -265,11 +259,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
                assert.Equal(t, 
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
                assert.Equal(t, 
request.Releases.AllocationsToRelease[0].PartitionName, "default")
                assert.Equal(t, 
request.Releases.AllocationsToRelease[0].AllocationKey, "task01")
-               assert.Assert(t, request.Releases.AllocationAsksToRelease != 
nil)
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].TerminationType, 
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+               assert.Equal(t, 
request.Releases.AllocationsToRelease[0].TerminationType, 
si.TerminationType_STOPPED_BY_RM)
                return nil
        })
        err = task.handle(NewFailTaskEvent(app.applicationID, "task01", "test 
failure"))
@@ -330,11 +320,10 @@ func TestReleaseTaskAsk(t *testing.T) {
        // this is to verify we are sending correct info to the scheduler core
        mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request 
*si.AllocationRequest) error {
                assert.Assert(t, request.Releases != nil)
-               assert.Assert(t, request.Releases.AllocationsToRelease == nil)
-               assert.Assert(t, request.Releases.AllocationAsksToRelease != 
nil)
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID)
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
-               assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].AllocationKey, task.taskID)
+               assert.Assert(t, request.Releases.AllocationsToRelease != nil)
+               assert.Equal(t, 
request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID)
+               assert.Equal(t, 
request.Releases.AllocationsToRelease[0].PartitionName, "default")
+               assert.Equal(t, 
request.Releases.AllocationsToRelease[0].AllocationKey, task.taskID)
                return nil
        })
 
@@ -598,11 +587,11 @@ func TestHandleSubmitTaskEvent(t *testing.T) {
        assert.Equal(t, task1.GetTaskState(), TaskStates().Scheduling)
        assert.Equal(t, rt.time, int64(1))
        assert.Assert(t, allocRequest != nil)
-       assert.Equal(t, len(allocRequest.Asks), 1)
-       assert.Equal(t, allocRequest.Asks[0].Priority, int32(1000))
-       assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy != nil)
-       assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy.AllowPreemptSelf)
-       assert.Assert(t, 
!allocRequest.Asks[0].PreemptionPolicy.AllowPreemptOther)
+       assert.Equal(t, len(allocRequest.Allocations), 1)
+       assert.Equal(t, allocRequest.Allocations[0].Priority, int32(1000))
+       assert.Assert(t, allocRequest.Allocations[0].PreemptionPolicy != nil)
+       assert.Assert(t, 
allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptSelf)
+       assert.Assert(t, 
!allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptOther)
        allocRequest = nil
        rt.time = 0
        // pod with taskGroup name
@@ -612,11 +601,11 @@ func TestHandleSubmitTaskEvent(t *testing.T) {
        assert.Equal(t, task2.GetTaskState(), TaskStates().Scheduling)
        assert.Equal(t, rt.time, int64(2))
        assert.Assert(t, allocRequest != nil)
-       assert.Equal(t, len(allocRequest.Asks), 1)
-       assert.Equal(t, allocRequest.Asks[0].Priority, int32(1001))
-       assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy != nil)
-       assert.Assert(t, 
!allocRequest.Asks[0].PreemptionPolicy.AllowPreemptSelf)
-       assert.Assert(t, 
allocRequest.Asks[0].PreemptionPolicy.AllowPreemptOther)
+       assert.Equal(t, len(allocRequest.Allocations), 1)
+       assert.Equal(t, allocRequest.Allocations[0].Priority, int32(1001))
+       assert.Assert(t, allocRequest.Allocations[0].PreemptionPolicy != nil)
+       assert.Assert(t, 
!allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptSelf)
+       assert.Assert(t, 
allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptOther)
 
        // Test over, set Recorder back fake type
        events.SetRecorder(k8sEvents.NewFakeRecorder(1024))
@@ -665,11 +654,9 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) 
{
        // because the task is in Scheduling state,
        // here we expect to trigger a UpdateRequest that contains a 
releaseAllocationAsk request
        mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request 
*si.AllocationRequest) error {
-               assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 
1,
-                       "allocationAskToRelease is not in the expected length")
-               assert.Equal(t, len(request.Releases.AllocationsToRelease), 0,
+               assert.Equal(t, len(request.Releases.AllocationsToRelease), 1,
                        "allocationsToRelease is not in the expected length")
-               askToRelease := request.Releases.AllocationAsksToRelease[0]
+               askToRelease := request.Releases.AllocationsToRelease[0]
                assert.Equal(t, askToRelease.ApplicationID, appID)
                assert.Equal(t, askToRelease.AllocationKey, podUID)
                return nil
@@ -689,8 +676,6 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) {
                PartitionName: "default",
        }
        mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request 
*si.AllocationRequest) error {
-               assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 
1,
-                       "allocationAskToRelease is not in the expected length")
                assert.Equal(t, len(request.Releases.AllocationsToRelease), 1,
                        "allocationsToRelease is not in the expected length")
                allocToRelease := request.Releases.AllocationsToRelease[0]
diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go
index c3c1a779..82a22530 100644
--- a/pkg/common/si_helper.go
+++ b/pkg/common/si_helper.go
@@ -73,11 +73,11 @@ func CreatePriorityForTask(pod *v1.Pod) int32 {
 }
 
 func CreateAllocationRequestForTask(appID, taskID string, resource 
*si.Resource, placeholder bool, taskGroupName string, pod *v1.Pod, originator 
bool, preemptionPolicy *si.PreemptionPolicy) *si.AllocationRequest {
-       ask := si.AllocationAsk{
+       ask := si.Allocation{
                AllocationKey:    taskID,
-               ResourceAsk:      resource,
+               ResourcePerAlloc: resource,
                ApplicationID:    appID,
-               Tags:             CreateTagsForTask(pod),
+               AllocationTags:   CreateTagsForTask(pod),
                Placeholder:      placeholder,
                TaskGroupName:    taskGroupName,
                Originator:       originator,
@@ -86,8 +86,8 @@ func CreateAllocationRequestForTask(appID, taskID string, 
resource *si.Resource,
        }
 
        return &si.AllocationRequest{
-               Asks: []*si.AllocationAsk{&ask},
-               RmID: conf.GetSchedulerConf().ClusterID,
+               Allocations: []*si.Allocation{&ask},
+               RmID:        conf.GetSchedulerConf().ClusterID,
        }
 }
 
@@ -121,30 +121,18 @@ func GetTerminationTypeFromString(terminationTypeStr 
string) si.TerminationType
        return si.TerminationType_STOPPED_BY_RM
 }
 
-func CreateReleaseRequestForTask(appID, taskID, allocationKey, partition, 
terminationType string) *si.AllocationRequest {
-       var allocToRelease []*si.AllocationRelease
-       if allocationKey != "" {
-               allocToRelease = make([]*si.AllocationRelease, 1)
-               allocToRelease[0] = &si.AllocationRelease{
-                       ApplicationID:   appID,
-                       AllocationKey:   allocationKey,
-                       PartitionName:   partition,
-                       TerminationType: 
GetTerminationTypeFromString(terminationType),
-                       Message:         "task completed",
-               }
-       }
-
-       askToRelease := make([]*si.AllocationAskRelease, 1)
-       askToRelease[0] = &si.AllocationAskRelease{
-               ApplicationID: appID,
-               AllocationKey: taskID,
-               PartitionName: partition,
-               Message:       "task request completed",
+func CreateReleaseRequestForTask(appID, taskID, partition, terminationType 
string) *si.AllocationRequest {
+       allocToRelease := make([]*si.AllocationRelease, 1)
+       allocToRelease[0] = &si.AllocationRelease{
+               ApplicationID:   appID,
+               AllocationKey:   taskID,
+               PartitionName:   partition,
+               TerminationType: GetTerminationTypeFromString(terminationType),
+               Message:         "task completed",
        }
 
        releaseRequest := si.AllocationReleasesRequest{
-               AllocationsToRelease:    allocToRelease,
-               AllocationAsksToRelease: askToRelease,
+               AllocationsToRelease: allocToRelease,
        }
 
        return &si.AllocationRequest{
diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go
index 92f5e240..b9464bd8 100644
--- a/pkg/common/si_helper_test.go
+++ b/pkg/common/si_helper_test.go
@@ -32,45 +32,23 @@ const nodeID = "node-01"
 
 func TestCreateReleaseRequestForTask(t *testing.T) {
        // with allocationKey
-       request := CreateReleaseRequestForTask("app01", "task01", "task01", 
"default", "STOPPED_BY_RM")
+       request := CreateReleaseRequestForTask("app01", "task01", "default", 
"STOPPED_BY_RM")
        assert.Assert(t, request.Releases != nil)
        assert.Assert(t, request.Releases.AllocationsToRelease != nil)
-       assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
        assert.Equal(t, len(request.Releases.AllocationsToRelease), 1)
-       assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
        assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, 
"app01")
        assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, 
"task01")
        assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, 
"default")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].TerminationType, 
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+       assert.Equal(t, 
request.Releases.AllocationsToRelease[0].TerminationType, 
si.TerminationType_STOPPED_BY_RM)
 
-       // without allocationKey
-       request = CreateReleaseRequestForTask("app01", "task01", "", "default", 
"STOPPED_BY_RM")
-       assert.Assert(t, request.Releases != nil)
-       assert.Assert(t, request.Releases.AllocationsToRelease == nil)
-       assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
-       assert.Equal(t, len(request.Releases.AllocationsToRelease), 0)
-       assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].TerminationType, 
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
-
-       request = CreateReleaseRequestForTask("app01", "task01", "task01", 
"default", "UNKNOWN")
+       request = CreateReleaseRequestForTask("app01", "task01", "default", 
"UNKNOWN_TERMINATION_TYPE")
        assert.Assert(t, request.Releases != nil)
        assert.Assert(t, request.Releases.AllocationsToRelease != nil)
-       assert.Assert(t, request.Releases.AllocationAsksToRelease != nil)
        assert.Equal(t, len(request.Releases.AllocationsToRelease), 1)
-       assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1)
        assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, 
"app01")
        assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, 
"task01")
        assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, 
"default")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].PartitionName, "default")
-       assert.Equal(t, 
request.Releases.AllocationAsksToRelease[0].TerminationType, 
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
+       assert.Equal(t, 
request.Releases.AllocationsToRelease[0].TerminationType, 
si.TerminationType_UNKNOWN_TERMINATION_TYPE)
 }
 
 func TestCreateUpdateRequestForRemoveApplication(t *testing.T) {
@@ -112,14 +90,14 @@ func TestCreateUpdateRequestForTask(t *testing.T) {
        }
 
        updateRequest := CreateAllocationRequestForTask("appId1", "taskId1", 
res, false, "", pod, false, preemptionPolicy)
-       asks := updateRequest.Asks
+       asks := updateRequest.Allocations
        assert.Equal(t, len(asks), 1)
        allocAsk := asks[0]
        assert.Assert(t, allocAsk != nil)
        assert.Assert(t, allocAsk.PreemptionPolicy != nil)
        assert.Equal(t, allocAsk.PreemptionPolicy.AllowPreemptSelf, true)
        assert.Equal(t, allocAsk.PreemptionPolicy.AllowPreemptOther, true)
-       tags := allocAsk.Tags
+       tags := allocAsk.AllocationTags
        assert.Assert(t, tags != nil)
        assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"podName"], 
podName)
        assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"namespace"], 
namespace)
@@ -287,7 +265,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) {
        }
 
        updateRequest := CreateAllocationRequestForTask("appId1", "taskId1", 
res, false, "", pod, false, preemptionPolicy)
-       asks := updateRequest.Asks
+       asks := updateRequest.Allocations
        assert.Equal(t, len(asks), 1)
        allocAsk := asks[0]
        if allocAsk == nil {
@@ -320,7 +298,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) {
        }
 
        updateRequest1 := CreateAllocationRequestForTask("appId1", "taskId1", 
res, false, "", pod1, false, preemptionPolicy1)
-       asks1 := updateRequest1.Asks
+       asks1 := updateRequest1.Allocations
        assert.Equal(t, len(asks1), 1)
        allocAsk1 := asks1[0]
        if allocAsk1 == nil {
@@ -329,7 +307,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) {
        assert.Assert(t, allocAsk1.PreemptionPolicy != nil)
        assert.Equal(t, allocAsk1.PreemptionPolicy.AllowPreemptSelf, true)
        assert.Equal(t, allocAsk1.PreemptionPolicy.AllowPreemptOther, false)
-       tags := allocAsk1.Tags
+       tags := allocAsk1.AllocationTags
        assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"podName"], 
podName1)
        assert.Equal(t, allocAsk1.Priority, int32(100))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to