This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 286abb64 [YUNIKORN-2724] Improve the signature of methods
notifyTaskComplete() and ensureAppAndTaskCreated() (#873)
286abb64 is described below
commit 286abb6403d65ad8098e9299c9f66027c5a10def
Author: ryankert <[email protected]>
AuthorDate: Mon Aug 12 04:44:45 2024 +0800
[YUNIKORN-2724] Improve the signature of methods notifyTaskComplete() and
ensureAppAndTaskCreated() (#873)
Closes: #873
Signed-off-by: Chia-Ping Tsai <[email protected]>
---
pkg/cache/context.go | 56 ++++++++++++++++++++++-------------------------
pkg/cache/context_test.go | 4 ++--
2 files changed, 28 insertions(+), 32 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 211bf0e9..6b791923 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -295,9 +295,9 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) {
}
func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
- var app *Application
taskID := string(pod.UID)
- if app = ctx.getApplication(appID); app != nil {
+ app := ctx.getApplication(appID)
+ if app != nil {
if task := app.GetTask(taskID); task != nil {
task.setTaskPod(pod)
}
@@ -305,7 +305,7 @@ func (ctx *Context) updateYuniKornPod(appID string, pod
*v1.Pod) {
// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
- ctx.notifyTaskComplete(appID, taskID)
+ ctx.notifyTaskComplete(app, taskID)
log.Log(log.ShimContext).Debug("Request to update terminated
pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
@@ -313,23 +313,21 @@ func (ctx *Context) updateYuniKornPod(appID string, pod
*v1.Pod) {
if ctx.schedulerCache.UpdatePod(pod) {
// pod was accepted; ensure the application and task objects
have been created
- ctx.ensureAppAndTaskCreated(pod)
+ ctx.ensureAppAndTaskCreated(pod, app)
}
}
-func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) {
- // get app metadata
- appMeta, ok := getAppMetadata(pod)
- if !ok {
- log.Log(log.ShimContext).Warn("BUG: Unable to retrieve
application metadata from YuniKorn-managed Pod",
- zap.String("namespace", pod.Namespace),
- zap.String("name", pod.Name))
- return
- }
-
+func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod, app *Application) {
// add app if it doesn't already exist
- app := ctx.GetApplication(appMeta.ApplicationID)
if app == nil {
+ // get app metadata
+ appMeta, ok := getAppMetadata(pod)
+ if !ok {
+ log.Log(log.ShimContext).Warn("BUG: Unable to retrieve
application metadata from YuniKorn-managed Pod",
+ zap.String("namespace", pod.Namespace),
+ zap.String("name", pod.Name))
+ return
+ }
app = ctx.AddApplication(&AddApplicationRequest{
Metadata: appMeta,
})
@@ -435,9 +433,7 @@ func (ctx *Context) DeletePod(obj interface{}) {
func (ctx *Context) deleteYuniKornPod(pod *v1.Pod) {
if taskMeta, ok := getTaskMetadata(pod); ok {
- if app := ctx.GetApplication(taskMeta.ApplicationID); app !=
nil {
- ctx.notifyTaskComplete(taskMeta.ApplicationID,
taskMeta.TaskID)
- }
+
ctx.notifyTaskComplete(ctx.GetApplication(taskMeta.ApplicationID),
taskMeta.TaskID)
}
log.Log(log.ShimContext).Debug("removing pod from cache",
zap.String("podName", pod.Name))
@@ -873,19 +869,19 @@ func (ctx *Context) StartPodAllocation(podKey string,
nodeID string) bool {
return ctx.schedulerCache.StartPodAllocation(podKey, nodeID)
}
-func (ctx *Context) notifyTaskComplete(appID, taskID string) {
- log.Log(log.ShimContext).Debug("NotifyTaskComplete",
- zap.String("appID", appID),
- zap.String("taskID", taskID))
- if app := ctx.GetApplication(appID); app != nil {
- log.Log(log.ShimContext).Debug("release allocation",
- zap.String("appID", appID),
+func (ctx *Context) notifyTaskComplete(app *Application, taskID string) {
+ if app == nil {
+ log.Log(log.ShimContext).Debug("In notifyTaskComplete but app
is nil",
zap.String("taskID", taskID))
- ev := NewSimpleTaskEvent(appID, taskID, CompleteTask)
- dispatcher.Dispatch(ev)
- if app.GetApplicationState() == ApplicationStates().Resuming {
- dispatcher.Dispatch(NewSimpleApplicationEvent(appID,
AppTaskCompleted))
- }
+ return
+ }
+ log.Log(log.ShimContext).Debug("release allocation in
notifyTaskComplete",
+ zap.String("appID", app.applicationID),
+ zap.String("taskID", taskID))
+ ev := NewSimpleTaskEvent(app.applicationID, taskID, CompleteTask)
+ dispatcher.Dispatch(ev)
+ if app.GetApplicationState() == ApplicationStates().Resuming {
+
dispatcher.Dispatch(NewSimpleApplicationEvent(app.applicationID,
AppTaskCompleted))
}
}
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index ba661c39..6d7bfafa 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -1034,7 +1034,7 @@ func TestTaskReleaseAfterRecovery(t *testing.T) {
assert.Equal(t, len(app.GetBoundTasks()), 2)
// release one of the tasks
- context.notifyTaskComplete(appID, pod2UID)
+ context.notifyTaskComplete(app, pod2UID)
// wait for release
err = utils.WaitForCondition(func() bool {
@@ -2075,7 +2075,7 @@ func TestTaskRemoveOnCompletion(t *testing.T) {
assert.NilError(t, err)
// mark completion
- context.notifyTaskComplete(appID, taskUID1)
+ context.notifyTaskComplete(app, taskUID1)
err = utils.WaitForCondition(func() bool {
return task.GetTaskState() == TaskStates().Completed
}, 100*time.Millisecond, time.Second)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]