This is an automated email from the ASF dual-hosted git repository.

mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 4366bbac [YUNIKORN-3226] Remove event creation for succeeded / deleted 
pods (#1001)
4366bbac is described below

commit 4366bbaca294a00e635c3ffc5e550c0f285f1686
Author: Manikandan R <[email protected]>
AuthorDate: Tue Feb 10 10:14:52 2026 +0530

    [YUNIKORN-3226] Remove event creation for succeeded / deleted pods (#1001)
    
    Closes: #1001
    
    Signed-off-by: Manikandan R <[email protected]>
---
 pkg/cache/application.go |  2 +-
 pkg/cache/context.go     |  6 +++---
 pkg/cache/task.go        | 19 +++++--------------
 pkg/cache/task_state.go  |  5 ++++-
 4 files changed, 13 insertions(+), 19 deletions(-)

diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index d92a4d00..5ea488f5 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -639,11 +639,11 @@ func (app *Application) 
handleReleaseAppAllocationEvent(taskID string, terminati
 
        if task, ok := app.taskMap[taskID]; ok {
                task.setTaskTerminationType(terminationType)
+               app.publishPlaceholderTimeoutEvents(task)
                err := task.DeleteTaskPod()
                if err != nil {
                        log.Log(log.ShimCacheApplication).Error("failed to 
release allocation from application", zap.Error(err))
                }
-               app.publishPlaceholderTimeoutEvents(task)
        } else {
                log.Log(log.ShimCacheApplication).Warn("task not found",
                        zap.String("appID", app.applicationID),
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 1fb26301..8a60786f 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -284,13 +284,13 @@ func (ctx *Context) deleteNodeInternal(node *v1.Node) {
 
        // decommission node
        log.Log(log.ShimContext).Info("Decommissioning node", 
zap.String("nodeName", node.Name))
-       if err := ctx.decommissionNode(node); err != nil {
-               log.Log(log.ShimContext).Warn("Unable to decommission node", 
zap.Error(err))
-       }
 
        // post the event
        events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeNormal, 
"NodeDeleted", "NodeDeleted",
                fmt.Sprintf("node %s is deleted from the scheduler", node.Name))
+       if err := ctx.decommissionNode(node); err != nil {
+               log.Log(log.ShimContext).Warn("Unable to decommission node", 
zap.Error(err))
+       }
 }
 
 func (ctx *Context) AddPod(obj interface{}) {
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 4dbe69e9..b6e857da 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -445,40 +445,31 @@ func (task *Task) postTaskRejected() {
        // currently, once task is rejected by scheduler, we directly move task 
to failed state.
        // so this function simply triggers the state transition when it is 
rejected.
        // but further, we can introduce retry mechanism if necessary.
-       dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
-               fmt.Sprintf("task %s failed because it is rejected by 
scheduler", task.alias)))
-
        events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
                v1.EventTypeWarning, "TaskRejected", "TaskRejected",
                "Task %s is rejected by the scheduler", task.alias)
+       dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
+               fmt.Sprintf("task %s failed because it is rejected by 
scheduler", task.alias)))
 }
 
 // beforeTaskFail releases the allocation or ask from scheduler core
 // this is done as a before hook because the releaseAllocation() call needs to
 // send different requests to scheduler-core, depending on current task state
 func (task *Task) beforeTaskFail() {
-       task.releaseAllocation()
-}
-
-func (task *Task) postTaskFailed(reason string) {
-       log.Log(log.ShimCacheTask).Error("task failed",
-               zap.String("appID", task.applicationID),
-               zap.String("taskID", task.taskID),
-               zap.String("reason", reason))
        events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
                v1.EventTypeNormal, "TaskFailed", "TaskFailed",
                "Task %s is failed", task.alias)
+       task.releaseAllocation()
 }
 
 // beforeTaskCompleted releases the allocation or ask from scheduler core
 // this is done as a before hook because the releaseAllocation() call needs to
 // send different requests to scheduler-core, depending on current task state
 func (task *Task) beforeTaskCompleted() {
-       task.releaseAllocation()
-
        events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
                v1.EventTypeNormal, "TaskCompleted", "TaskCompleted",
                "Task %s is completed", task.alias)
+       task.releaseAllocation()
 }
 
 // releaseAllocation sends the release request for the Allocation to the core.
@@ -634,9 +625,9 @@ func (task *Task) FailWithEvent(errorMessage, actionReason 
string) {
 }
 
 func (task *Task) failWithEvent(errorMessage, actionReason string) {
-       dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, 
errorMessage))
        events.GetRecorder().Eventf(task.pod.DeepCopy(),
                nil, v1.EventTypeWarning, actionReason, actionReason, 
errorMessage)
+       dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, 
errorMessage))
 }
 
 func (task *Task) SetTaskPod(pod *v1.Pod) {
diff --git a/pkg/cache/task_state.go b/pkg/cache/task_state.go
index 112fdd5a..b2bef7eb 100644
--- a/pkg/cache/task_state.go
+++ b/pkg/cache/task_state.go
@@ -409,7 +409,10 @@ func callbacks(states *TStates) fsm.Callbacks {
                        } else {
                                reason = eventArgs[0]
                        }
-                       task.postTaskFailed(reason)
+                       log.Log(log.ShimCacheTask).Error("task failed",
+                               zap.String("appID", task.applicationID),
+                               zap.String("taskID", task.taskID),
+                               zap.String("reason", reason))
                },
                states.Bound: func(_ context.Context, event *fsm.Event) {
                        task := event.Args[0].(*Task) //nolint:errcheck


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to