This is an automated email from the ASF dual-hosted git repository.
manirajv06 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 214a23da [YUNIKORN-3273] Remove code: clean up scheduler_plugin.go
(#1022)
214a23da is described below
commit 214a23da9f41d1ebd5c11ffd9c37f2ee7ecc406e
Author: Manikandan R <[email protected]>
AuthorDate: Wed May 6 12:44:21 2026 +0530
[YUNIKORN-3273] Remove code: clean up scheduler_plugin.go (#1022)
Closes: #1022
Signed-off-by: Manikandan R <[email protected]>
---
pkg/plugin/scheduler_plugin.go | 327 -----------------------------------------
1 file changed, 327 deletions(-)
diff --git a/pkg/plugin/scheduler_plugin.go b/pkg/plugin/scheduler_plugin.go
deleted file mode 100644
index 0dd05d29..00000000
--- a/pkg/plugin/scheduler_plugin.go
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package plugin
-
-import (
- "context"
- "fmt"
-
- "go.uber.org/zap"
- v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- "k8s.io/client-go/informers"
- "k8s.io/klog/v2"
- fwk "k8s.io/kube-scheduler/framework"
- "k8s.io/kubernetes/pkg/scheduler/framework"
-
- "github.com/apache/yunikorn-core/pkg/entrypoint"
- "github.com/apache/yunikorn-k8shim/pkg/cache"
- "github.com/apache/yunikorn-k8shim/pkg/client"
- "github.com/apache/yunikorn-k8shim/pkg/common/events"
- "github.com/apache/yunikorn-k8shim/pkg/common/utils"
- "github.com/apache/yunikorn-k8shim/pkg/conf"
- "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
- "github.com/apache/yunikorn-k8shim/pkg/locking"
- "github.com/apache/yunikorn-k8shim/pkg/log"
- "github.com/apache/yunikorn-k8shim/pkg/shim"
-)
-
-const (
- SchedulerPluginName = "YuniKornPlugin"
-)
-
-// YuniKornSchedulerPlugin provides an implementation of several lifecycle
methods of the Kubernetes scheduling framework:
-//
-//
https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/
-//
-// PreFilter: Used to notify the default scheduler that a particular pod has
been marked ready for scheduling by YuniKorn
-//
-// Filter: Used to notify the default scheduler that a particular pod/node
combination is ready to be scheduled
-//
-// PostBind: Used to notify YuniKorn that a pod has been scheduled successfully
-//
-// Pod Allocations:
-//
-// The YuniKorn scheduler is always running in the background, making
decisions about which pods to allocate to which
-// nodes. When a decision is made, that pod is marked as having a "pending"
pod allocation, which means YuniKorn has
-// allocated the pod, but the default scheduler (via the plugin interface) has
not yet been notified.
-//
-// Once PreFilter() has been called for a particular pod, that allocation is
marked as "in progress" meaning it has been
-// communicated to the default scheduler, but has not yet been fulfilled.
-//
-// Finally, in PostBind(), the allocation is removed as we now know that the
pod has been allocated successfully.
-// If a pending or in-progress allocation is detected for a pod in
PreFilter(), we remove the allocation and force the
-// pod to be rescheduled, as this means the prior allocation could not be
completed successfully by the default
-// scheduler for some reason.
-type YuniKornSchedulerPlugin struct {
- locking.RWMutex
- context *cache.Context
-}
-
-// ensure all required interfaces are implemented
-var _ framework.PreEnqueuePlugin = &YuniKornSchedulerPlugin{}
-var _ framework.PreFilterPlugin = &YuniKornSchedulerPlugin{}
-var _ framework.FilterPlugin = &YuniKornSchedulerPlugin{}
-var _ framework.PostBindPlugin = &YuniKornSchedulerPlugin{}
-var _ framework.EnqueueExtensions = &YuniKornSchedulerPlugin{}
-
-// Name returns the name of the plugin
-func (sp *YuniKornSchedulerPlugin) Name() string {
- return SchedulerPluginName
-}
-
-// PreEnqueue is called prior to adding Pods to activeQ
-func (sp *YuniKornSchedulerPlugin) PreEnqueue(_ context.Context, pod *v1.Pod)
*fwk.Status {
- log.Log(log.ShimSchedulerPlugin).Debug("PreEnqueue check",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name))
-
- // we don't process pods without appID defined
- appID := utils.GetApplicationIDFromPod(pod)
- if appID == "" {
- log.Log(log.ShimSchedulerPlugin).Debug("Releasing non-managed
Pod for scheduling (PreEnqueue phase)",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name))
- return nil
- }
-
- taskID := string(pod.UID)
- if app, task, ok := sp.getTask(appID, taskID); ok {
- if _, ok := sp.context.GetInProgressPodAllocation(taskID); ok {
- // pod must have failed scheduling in a prior run,
reject it and return unschedulable
- sp.failTask(pod, app, task)
- return fwk.NewStatus(fwk.UnschedulableAndUnresolvable,
"Pod is not ready for scheduling")
- }
-
- nodeID, ok := sp.context.GetPendingPodAllocation(taskID)
- if task.GetTaskState() == cache.TaskStates().Bound && ok {
- log.Log(log.ShimSchedulerPlugin).Info("Releasing pod
for scheduling (PreEnqueue phase)",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name),
- zap.String("taskID", taskID),
- zap.String("assignedNode", nodeID))
- return nil
- }
-
- schedState := task.GetTaskSchedulingState()
- switch schedState {
- case cache.TaskSchedPending:
- return fwk.NewStatus(fwk.UnschedulableAndUnresolvable,
"Pod is pending scheduling")
- case cache.TaskSchedFailed:
- // allow the pod to proceed so that it will be marked
unschedulable by PreFilter
- return nil
- case cache.TaskSchedSkipped:
- return fwk.NewStatus(fwk.UnschedulableAndUnresolvable,
"Pod doesn't fit within queue")
- default:
- return fwk.NewStatus(fwk.UnschedulableAndUnresolvable,
fmt.Sprintf("Pod unschedulable: %s", schedState.String()))
- }
- }
-
- // task not found (yet?) -- probably means cache update hasn't come
through yet
- return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "Pod not ready
for scheduling")
-}
-
-// PreFilter is used to release pods to scheduler
-func (sp *YuniKornSchedulerPlugin) PreFilter(_ context.Context, _
fwk.CycleState, pod *v1.Pod, _ []fwk.NodeInfo) (*framework.PreFilterResult,
*fwk.Status) {
- log.Log(log.ShimSchedulerPlugin).Debug("PreFilter check",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name))
-
- // we don't process pods without appID defined
- appID := utils.GetApplicationIDFromPod(pod)
- if appID == "" {
- log.Log(log.ShimSchedulerPlugin).Debug("Releasing non-managed
Pod for scheduling (PreFilter phase)",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name))
-
- return nil, fwk.NewStatus(fwk.Skip)
- }
-
- taskID := string(pod.UID)
- if app, task, ok := sp.getTask(appID, taskID); ok {
- if _, ok := sp.context.GetInProgressPodAllocation(taskID); ok {
- // pod must have failed scheduling, reject it and
return unschedulable
- sp.failTask(pod, app, task)
- return nil,
fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "Pod is not ready for
scheduling")
- }
-
- nodeID, ok := sp.context.GetPendingPodAllocation(taskID)
- if task.GetTaskState() == cache.TaskStates().Bound && ok {
- log.Log(log.ShimSchedulerPlugin).Info("Releasing pod
for scheduling (PreFilter phase)",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name),
- zap.String("taskID", taskID),
- zap.String("assignedNode", nodeID))
- return &framework.PreFilterResult{NodeNames:
sets.New[string](nodeID)}, nil
- }
- }
-
- return nil, fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "Pod is not
ready for scheduling")
-}
-
-// PreFilterExtensions is unused
-func (sp *YuniKornSchedulerPlugin) PreFilterExtensions()
framework.PreFilterExtensions {
- return nil
-}
-
-// Filter is used to release specific pod/node combinations to scheduler
-func (sp *YuniKornSchedulerPlugin) Filter(_ context.Context, _ fwk.CycleState,
pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
- log.Log(log.ShimSchedulerPlugin).Debug("Filter check",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name),
- zap.String("node", nodeInfo.Node().Name))
-
- // we don't process pods without appID defined
- appID := utils.GetApplicationIDFromPod(pod)
- if appID == "" {
- log.Log(log.ShimSchedulerPlugin).Debug("Releasing non-managed
Pod fo scheduling (Filter phase)",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name))
- return nil
- }
-
- taskID := string(pod.UID)
- if _, task, ok := sp.getTask(appID, taskID); ok {
- if task.GetTaskState() == cache.TaskStates().Bound {
- // attempt to start a pod allocation. Filter() gets
called once per {Pod,Node} candidate; we only want
- // to proceed in the case where the Node we are asked
about matches the one YuniKorn has selected.
- // this check is fairly cheap (one map lookup); if we
fail the check here the scheduling framework will
- // immediately call Filter() again with a different
candidate Node.
- if sp.context.StartPodAllocation(taskID,
nodeInfo.Node().Name) {
-
log.Log(log.ShimSchedulerPlugin).Info("Releasing pod for scheduling (Filter
phase)",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name),
- zap.String("taskID", taskID),
- zap.String("assignedNode",
nodeInfo.Node().Name))
- return nil
- }
- }
- }
-
- return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "Pod is not fit
for node")
-}
-
-func (sp *YuniKornSchedulerPlugin) EventsToRegister(_ context.Context)
([]fwk.ClusterEventWithHint, error) {
- return sp.context.EventsToRegister(func(_ klog.Logger, pod *v1.Pod, _,
_ interface{}) (fwk.QueueingHint, error) {
- // adapt our simpler function to the QueueingHintFn contract
- return sp.queueingHint(pod)
- }), nil
-}
-
-// queueingHint is used to perform a lightweight check to determine if any
object change may cause a pod to become
-// schedulable when it was not previously. Since YuniKorn maintains its own
internal scheduling state, only the pod
-// is needed. This function will only be called on a previously unschedulable
pod by this plugin -- therefore this
-// is definitely a YuniKorn pod.
-func (sp *YuniKornSchedulerPlugin) queueingHint(pod *v1.Pod)
(fwk.QueueingHint, error) {
- // Use the context's bloom filter to rule out this task if it is not
present. Given a large backlog,
- // this will almost always return false and we can skip re-enqueue.
- taskID := string(pod.UID)
- if !sp.context.IsTaskMaybeSchedulable(taskID) {
- return fwk.QueueSkip, nil
- }
-
- return fwk.Queue, nil
-}
-
-// PostBind is used to mark allocations as completed once scheduling run is
finished
-func (sp *YuniKornSchedulerPlugin) PostBind(_ context.Context, _
fwk.CycleState, pod *v1.Pod, nodeName string) {
- log.Log(log.ShimSchedulerPlugin).Debug("PostBind handler",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name),
- zap.String("assignedNode", nodeName))
-
- // we don't process pods without appID defined
- appID := utils.GetApplicationIDFromPod(pod)
- if appID == "" {
- log.Log(log.ShimSchedulerPlugin).Debug("Non-managed Pod bound
successfully",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name))
- return
- }
-
- taskID := string(pod.UID)
- if _, _, ok := sp.getTask(appID, taskID); ok {
- log.Log(log.ShimSchedulerPlugin).Info("Managed Pod bound
successfully",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name),
- zap.String("taskID", taskID),
- zap.String("assignedNode", nodeName))
- sp.context.RemovePodAllocation(taskID)
- }
-}
-
-// NewSchedulerPlugin initializes a new plugin and returns it
-func NewSchedulerPlugin(_ context.Context, _ runtime.Object, handle
framework.Handle) (framework.Plugin, error) {
- log.Log(log.ShimSchedulerPlugin).Info(conf.GetBuildInfoString())
- log.Log(log.ShimSchedulerPlugin).Warn("The plugin mode has been
deprecated and will be removed in a future release. Consider migrating to
YuniKorn standalone mode.")
-
- configMaps, err := client.LoadBootstrapConfigMaps()
- if err != nil {
- log.Log(log.ShimSchedulerPlugin).Fatal("Unable to bootstrap
configuration", zap.Error(err))
- }
-
- err = conf.UpdateConfigMaps(configMaps, true)
- if err != nil {
- log.Log(log.ShimSchedulerPlugin).Fatal("Unable to load initial
configmaps", zap.Error(err))
- }
-
- // start the YK core scheduler
- serviceContext :=
entrypoint.StartAllServicesWithLogger(log.RootLogger(), log.GetZapConfigs())
- if serviceContext.RMProxy == nil {
- return nil, fmt.Errorf("internal error: serviceContext should
implement interface api.SchedulerAPI")
- }
-
- // we need our own informer factory here because the informers we get
from the framework handle aren't yet initialized
- informerFactory :=
informers.NewSharedInformerFactory(handle.ClientSet(), 0)
- ss := shim.NewShimSchedulerForPlugin(serviceContext.RMProxy,
informerFactory, conf.GetSchedulerConf(), configMaps)
- if err := ss.Run(); err != nil {
- log.Log(log.ShimSchedulerPlugin).Fatal("Unable to start
scheduler", zap.Error(err))
- }
-
- context := ss.GetContext()
- context.SetPodActivator(func(logger klog.Logger, pod *v1.Pod) {
- handle.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
- })
- p := &YuniKornSchedulerPlugin{
- context: context,
- }
- events.SetRecorder(handle.EventRecorder())
- return p, nil
-}
-
-func (sp *YuniKornSchedulerPlugin) getTask(appID, taskID string) (app
*cache.Application, task *cache.Task, ok bool) {
- if app := sp.context.GetApplication(appID); app != nil {
- if task := app.GetTask(taskID); task != nil {
- return app, task, true
- }
- }
- return nil, nil, false
-}
-
-func (sp *YuniKornSchedulerPlugin) failTask(pod *v1.Pod, app
*cache.Application, task *cache.Task) {
- taskID := task.GetTaskID()
- log.Log(log.ShimSchedulerPlugin).Info("Task failed scheduling, marking
as rejected",
- zap.String("namespace", pod.Namespace),
- zap.String("pod", pod.Name),
- zap.String("taskID", taskID))
- sp.context.RemovePodAllocation(taskID)
- dispatcher.Dispatch(cache.NewRejectTaskEvent(app.GetApplicationID(),
taskID, fmt.Sprintf("task %s rejected by scheduler", taskID)))
- task.SetTaskSchedulingState(cache.TaskSchedFailed)
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]