This is an automated email from the ASF dual-hosted git repository.
mani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 4366bbac [YUNIKORN-3226] Remove event creation for succeeded / deleted
pods (#1001)
4366bbac is described below
commit 4366bbaca294a00e635c3ffc5e550c0f285f1686
Author: Manikandan R <[email protected]>
AuthorDate: Tue Feb 10 10:14:52 2026 +0530
[YUNIKORN-3226] Remove event creation for succeeded / deleted pods (#1001)
Closes: #1001
Signed-off-by: Manikandan R <[email protected]>
---
pkg/cache/application.go | 2 +-
pkg/cache/context.go | 6 +++---
pkg/cache/task.go | 19 +++++--------------
pkg/cache/task_state.go | 5 ++++-
4 files changed, 13 insertions(+), 19 deletions(-)
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index d92a4d00..5ea488f5 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -639,11 +639,11 @@ func (app *Application)
handleReleaseAppAllocationEvent(taskID string, terminati
if task, ok := app.taskMap[taskID]; ok {
task.setTaskTerminationType(terminationType)
+ app.publishPlaceholderTimeoutEvents(task)
err := task.DeleteTaskPod()
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to
release allocation from application", zap.Error(err))
}
- app.publishPlaceholderTimeoutEvents(task)
} else {
log.Log(log.ShimCacheApplication).Warn("task not found",
zap.String("appID", app.applicationID),
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 1fb26301..8a60786f 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -284,13 +284,13 @@ func (ctx *Context) deleteNodeInternal(node *v1.Node) {
// decommission node
log.Log(log.ShimContext).Info("Decommissioning node",
zap.String("nodeName", node.Name))
- if err := ctx.decommissionNode(node); err != nil {
- log.Log(log.ShimContext).Warn("Unable to decommission node",
zap.Error(err))
- }
// post the event
events.GetRecorder().Eventf(node.DeepCopy(), nil, v1.EventTypeNormal,
"NodeDeleted", "NodeDeleted",
fmt.Sprintf("node %s is deleted from the scheduler", node.Name))
+ if err := ctx.decommissionNode(node); err != nil {
+ log.Log(log.ShimContext).Warn("Unable to decommission node",
zap.Error(err))
+ }
}
func (ctx *Context) AddPod(obj interface{}) {
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 4dbe69e9..b6e857da 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -445,40 +445,31 @@ func (task *Task) postTaskRejected() {
// currently, once task is rejected by scheduler, we directly move task
to failed state.
// so this function simply triggers the state transition when it is
rejected.
// but further, we can introduce retry mechanism if necessary.
- dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
- fmt.Sprintf("task %s failed because it is rejected by
scheduler", task.alias)))
-
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeWarning, "TaskRejected", "TaskRejected",
"Task %s is rejected by the scheduler", task.alias)
+ dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
+ fmt.Sprintf("task %s failed because it is rejected by
scheduler", task.alias)))
}
// beforeTaskFail releases the allocation or ask from scheduler core
// this is done as a before hook because the releaseAllocation() call needs to
// send different requests to scheduler-core, depending on current task state
func (task *Task) beforeTaskFail() {
- task.releaseAllocation()
-}
-
-func (task *Task) postTaskFailed(reason string) {
- log.Log(log.ShimCacheTask).Error("task failed",
- zap.String("appID", task.applicationID),
- zap.String("taskID", task.taskID),
- zap.String("reason", reason))
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "TaskFailed", "TaskFailed",
"Task %s is failed", task.alias)
+ task.releaseAllocation()
}
// beforeTaskCompleted releases the allocation or ask from scheduler core
// this is done as a before hook because the releaseAllocation() call needs to
// send different requests to scheduler-core, depending on current task state
func (task *Task) beforeTaskCompleted() {
- task.releaseAllocation()
-
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "TaskCompleted", "TaskCompleted",
"Task %s is completed", task.alias)
+ task.releaseAllocation()
}
// releaseAllocation sends the release request for the Allocation to the core.
@@ -634,9 +625,9 @@ func (task *Task) FailWithEvent(errorMessage, actionReason
string) {
}
func (task *Task) failWithEvent(errorMessage, actionReason string) {
- dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
errorMessage))
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeWarning, actionReason, actionReason,
errorMessage)
+ dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID,
errorMessage))
}
func (task *Task) SetTaskPod(pod *v1.Pod) {
diff --git a/pkg/cache/task_state.go b/pkg/cache/task_state.go
index 112fdd5a..b2bef7eb 100644
--- a/pkg/cache/task_state.go
+++ b/pkg/cache/task_state.go
@@ -409,7 +409,10 @@ func callbacks(states *TStates) fsm.Callbacks {
} else {
reason = eventArgs[0]
}
- task.postTaskFailed(reason)
+ log.Log(log.ShimCacheTask).Error("task failed",
+ zap.String("appID", task.applicationID),
+ zap.String("taskID", task.taskID),
+ zap.String("reason", reason))
},
states.Bound: func(_ context.Context, event *fsm.Event) {
task := event.Args[0].(*Task) //nolint:errcheck
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]