This is an automated email from the ASF dual-hosted git repository.
manirajv06 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 0eac878a [YUNIKORN-3276] Remove code: Clean up context in shim repo
(#1025)
0eac878a is described below
commit 0eac878a3bbfa0d85eca69bf1852f9535a63d7a5
Author: Manikandan R <[email protected]>
AuthorDate: Thu May 7 10:00:46 2026 +0530
[YUNIKORN-3276] Remove code: Clean up context in shim repo (#1025)
Closes: #1025
Signed-off-by: Manikandan R <[email protected]>
---
pkg/cache/context.go | 40 ------------
pkg/cache/context_test.go | 115 ----------------------------------
pkg/cache/external/scheduler_cache.go | 59 -----------------
3 files changed, 214 deletions(-)
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index ec6f7c3b..850194ae 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -69,13 +69,11 @@ type Context struct {
schedulerCache *schedulercache.SchedulerCache // external cache
apiProvider client.APIProvider // apis to interact with
api-server, scheduler-core, etc
predManager predicates.PredicateManager // K8s predicates
- pluginMode bool // true if we are
configured as a scheduler plugin
namespace string // yunikorn namespace
configMaps []*v1.ConfigMap // cached yunikorn
configmaps
lock *locking.RWMutex // lock - used not only
for context data but also to ensure that multiple event types are not executed
concurrently
txnID atomic.Uint64 // transaction ID counter
klogger klog.Logger
- podActivator atomic.Value
}
// NewContext create a new context for the scheduler using a default (empty)
configuration
@@ -112,18 +110,6 @@ func NewContextWithBootstrapConfigMaps(apis
client.APIProvider, bootstrapConfigM
return ctx
}
-// SetPodActivator is used by the plugin mode to add a callback function to
reschedule a pod
-func (ctx *Context) SetPodActivator(podActivator func(logger klog.Logger, pod
*v1.Pod)) {
- ctx.podActivator.Store(podActivator)
-}
-
-// ActivatePod is used to tell Kubernetes to re-schedule a pod when using
plugin mode
-func (ctx *Context) ActivatePod(pod *v1.Pod) {
- if activator, ok := ctx.podActivator.Load().(func(logger klog.Logger,
pod *v1.Pod)); ok && activator != nil {
- activator(ctx.klogger, pod)
- }
-}
-
func (ctx *Context) AddSchedulingEventHandlers() error {
err := ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
Type: client.ConfigMapInformerHandlers,
@@ -170,10 +156,6 @@ func (ctx *Context) AddSchedulingEventHandlers() error {
return nil
}
-func (ctx *Context) IsPluginMode() bool {
- return ctx.pluginMode
-}
-
func (ctx *Context) addNode(obj interface{}) {
ctx.updateNode(nil, obj)
}
@@ -890,28 +872,6 @@ func (ctx *Context) IsTaskMaybeSchedulable(taskID string)
bool {
return ctx.schedulerCache.IsTaskMaybeSchedulable(taskID)
}
-func (ctx *Context) AddPendingPodAllocation(podKey string, nodeID string) {
- ctx.schedulerCache.AddPendingPodAllocation(podKey, nodeID)
-}
-
-func (ctx *Context) RemovePodAllocation(podKey string) {
- ctx.schedulerCache.RemovePodAllocation(podKey)
-}
-
-func (ctx *Context) GetPendingPodAllocation(podKey string) (nodeID string, ok
bool) {
- nodeID, ok = ctx.schedulerCache.GetPendingPodAllocation(podKey)
- return nodeID, ok
-}
-
-func (ctx *Context) GetInProgressPodAllocation(podKey string) (nodeID string,
ok bool) {
- nodeID, ok = ctx.schedulerCache.GetInProgressPodAllocation(podKey)
- return nodeID, ok
-}
-
-func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool {
- return ctx.schedulerCache.StartPodAllocation(podKey, nodeID)
-}
-
func (ctx *Context) notifyTaskComplete(app *Application, taskID string) {
if app == nil {
log.Log(log.ShimContext).Debug("In notifyTaskComplete but app
is nil",
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 7732d7d2..466bba00 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -1648,121 +1648,6 @@ func TestAddApplicationsWithTags(t *testing.T) {
}
}
-func TestPendingPodAllocations(t *testing.T) {
- utils.SetPluginMode(true)
- defer utils.SetPluginMode(false)
-
- context, apiProvider := initContextAndAPIProviderForTest()
- dispatcher.Start()
- defer dispatcher.UnregisterAllEventHandlers()
- defer dispatcher.Stop()
-
- apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest)
error {
- for _, node := range request.Nodes {
- dispatcher.Dispatch(CachedSchedulerNodeEvent{
- NodeID: node.NodeID,
- Event: NodeAccepted,
- })
- }
- return nil
- })
-
- node1 := v1.Node{
- ObjectMeta: apis.ObjectMeta{
- Name: Host1,
- Namespace: "default",
- UID: uid1,
- },
- }
- context.addNode(&node1)
-
- node2 := v1.Node{
- ObjectMeta: apis.ObjectMeta{
- Name: Host2,
- Namespace: "default",
- UID: uid2,
- },
- }
- context.addNode(&node2)
-
- // add a new application
- context.AddApplication(&AddApplicationRequest{
- Metadata: ApplicationMetadata{
- ApplicationID: appID1,
- QueueName: queueNameA,
- User: testUser,
- Tags: nil,
- },
- })
-
- pod := &v1.Pod{
- TypeMeta: apis.TypeMeta{
- Kind: "Pod",
- APIVersion: "v1",
- },
- ObjectMeta: apis.ObjectMeta{
- Name: taskUID1,
- UID: uid1,
- },
- }
-
- // add a tasks to the existing application
- task := context.AddTask(&AddTaskRequest{
- Metadata: TaskMetadata{
- ApplicationID: appID1,
- TaskID: taskUID1,
- Pod: pod,
- },
- })
- assert.Assert(t, task != nil, "task was nil")
-
- // add the allocation
- context.AddPendingPodAllocation(uid1, Host1)
-
- // validate that the pending allocation matches
- nodeID, ok := context.GetPendingPodAllocation(uid1)
- if !ok {
- t.Fatalf("no pending pod allocation found")
- }
- assert.Equal(t, nodeID, Host1, "wrong host")
-
- // validate that there is not an in-progress allocation
- if _, ok = context.GetInProgressPodAllocation(uid1); ok {
- t.Fatalf("in-progress allocation exists when it should be
pending")
- }
-
- if context.StartPodAllocation(uid1, Host2) {
- t.Fatalf("attempt to start pod allocation on wrong node
succeeded")
- }
-
- if !context.StartPodAllocation(uid1, Host1) {
- t.Fatalf("attempt to start pod allocation on correct node
failed")
- }
-
- if _, ok = context.GetPendingPodAllocation(uid1); ok {
- t.Fatalf("pending pod allocation still exists after transition
to in-progress")
- }
-
- nodeID, ok = context.GetInProgressPodAllocation(uid1)
- if !ok {
- t.Fatalf("in-progress allocation does not exist")
- }
- assert.Equal(t, nodeID, Host1, "wrong host")
-
- context.RemovePodAllocation(uid1)
- if _, ok = context.GetInProgressPodAllocation(uid1); ok {
- t.Fatalf("in-progress pod allocation still exists after
removal")
- }
-
- // re-add to validate pending pod removal
- context.AddPendingPodAllocation(uid1, Host1)
- context.RemovePodAllocation(uid1)
-
- if _, ok = context.GetPendingPodAllocation(uid1); ok {
- t.Fatalf("pending pod allocation still exists after removal")
- }
-}
-
func TestGetStateDump(t *testing.T) {
context := initContextForTest()
diff --git a/pkg/cache/external/scheduler_cache.go
b/pkg/cache/external/scheduler_cache.go
index 0e5c77f3..c1217b98 100644
--- a/pkg/cache/external/scheduler_cache.go
+++ b/pkg/cache/external/scheduler_cache.go
@@ -309,65 +309,6 @@ func (cache *SchedulerCache) IsTaskMaybeSchedulable(taskID
string) bool {
return cache.taskBloomFilterRef.Load().isTaskMaybePresent(taskID)
}
-// AddPendingPodAllocation is used to add a new pod -> node mapping to the
cache when running in scheduler plugin mode.
-// This function is called (in plugin mode) after a task is allocated by the
YuniKorn scheduler.
-func (cache *SchedulerCache) AddPendingPodAllocation(podKey string, nodeID
string) {
- cache.lock.Lock()
- defer cache.lock.Unlock()
- cache.dumpState("AddPendingPodAllocation.Pre")
- defer cache.dumpState("AddPendingPodAllocation.Post")
- delete(cache.inProgressAllocations, podKey)
- cache.pendingAllocations[podKey] = nodeID
-}
-
-// RemovePodAllocation is used to remove a pod -> node mapping from the cache
when running in scheduler plugin
-// mode. It removes both pending and in-progress allocations. This function is
called (via cache) from the scheduler
-// plugin in PreFilter() if a previous allocation was found, and in PostBind()
to cleanup the allocation since it is no
-// longer relevant.
-func (cache *SchedulerCache) RemovePodAllocation(podKey string) {
- cache.lock.Lock()
- defer cache.lock.Unlock()
- cache.dumpState("RemovePendingPodAllocation.Pre")
- defer cache.dumpState("RemovePendingPodAllocation.Post")
- delete(cache.pendingAllocations, podKey)
- delete(cache.inProgressAllocations, podKey)
-}
-
-// GetPendingPodAllocation is used in scheduler plugin mode to retrieve a
pending pod allocation. A pending
-// allocation is one which has been decided upon by YuniKorn but has not yet
been communicated to the default scheduler.
-func (cache *SchedulerCache) GetPendingPodAllocation(podKey string) (nodeID
string, ok bool) {
- cache.lock.RLock()
- defer cache.lock.RUnlock()
- res, ok := cache.pendingAllocations[podKey]
- return res, ok
-}
-
-// GetInProgressPodAllocation is used in scheduler plugin mode to retrieve an
in-progress pod allocation. An in-progress
-// allocation is one which has been communicated to the default scheduler, but
has not yet been bound.
-func (cache *SchedulerCache) GetInProgressPodAllocation(podKey string) (nodeID
string, ok bool) {
- cache.lock.RLock()
- defer cache.lock.RUnlock()
- res, ok := cache.inProgressAllocations[podKey]
- return res, ok
-}
-
-// StartPodAllocation is used in scheduler plugin mode to transition a pod
allocation from pending to in-progress. If
-// the given pod has a pending allocation on the given node, the allocation is
marked as in-progress and this function
-// returns true. If the pod is not pending or is pending on another node, this
function does nothing and returns false.
-func (cache *SchedulerCache) StartPodAllocation(podKey string, nodeID string)
bool {
- cache.lock.Lock()
- defer cache.lock.Unlock()
- cache.dumpState("StartPendingPodAllocation.Pre")
- defer cache.dumpState("StartPendingPodAllocation.Post")
- expectedNodeID, ok := cache.pendingAllocations[podKey]
- if ok && expectedNodeID == nodeID {
- delete(cache.pendingAllocations, podKey)
- cache.inProgressAllocations[podKey] = nodeID
- return true
- }
- return false
-}
-
// IsAssumedPod returns if pod is assumed in cache, avoid nil
func (cache *SchedulerCache) IsAssumedPod(podKey string) bool {
cache.lock.RLock()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]