This is an automated email from the ASF dual-hosted git repository.

wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 9487666c [YUNIKORN-2074] Remove scheduler state machine (#700)
9487666c is described below

commit 9487666c52415a79ac984747846762677b41a7f7
Author: Craig Condit <[email protected]>
AuthorDate: Thu Oct 26 13:09:37 2023 +1100

    [YUNIKORN-2074] Remove scheduler state machine (#700)
    
    Shim Scheduler startup does not need a state machine as the order is well
    defined. A failure at any point in the startup sequence will cause a
    failure of the whole scheduler.
    
    Closes: #700
    
    Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
 Makefile                            |   2 +-
 pkg/cmd/shim/main.go                |   4 +-
 pkg/plugin/scheduler_plugin.go      |   4 +-
 pkg/shim/scheduler.go               | 160 +++++++++----------------------
 pkg/shim/scheduler_graphviz_test.go |  61 ------------
 pkg/shim/scheduler_mock_test.go     |  25 ++---
 pkg/shim/scheduler_perf_test.go     |   5 +-
 pkg/shim/scheduler_state.go         | 183 ------------------------------------
 pkg/shim/scheduler_test.go          |  39 +-------
 9 files changed, 67 insertions(+), 416 deletions(-)

diff --git a/Makefile b/Makefile
index b2d06ed3..a2e0d589 100644
--- a/Makefile
+++ b/Makefile
@@ -510,7 +510,7 @@ bench:
 fsm_graph:
        @echo "generating FSM graphs"
        "$(GO)" clean -testcache
-       "$(GO)" test -tags graphviz -run 'Test.*FsmGraph' ./pkg/shim ./pkg/cache
+       "$(GO)" test -tags graphviz -run 'Test.*FsmGraph' ./pkg/cache
        scripts/generate-fsm-graph-images.sh
 
 # Remove generated build artifacts
diff --git a/pkg/cmd/shim/main.go b/pkg/cmd/shim/main.go
index 9f240670..cc6785df 100644
--- a/pkg/cmd/shim/main.go
+++ b/pkg/cmd/shim/main.go
@@ -51,7 +51,9 @@ func main() {
 
        if serviceContext.RMProxy != nil {
                ss := shim.NewShimScheduler(serviceContext.RMProxy, 
conf.GetSchedulerConf(), configMaps)
-               ss.Run()
+               if err := ss.Run(); err != nil {
+                       log.Log(log.Shim).Fatal("Unable tto start scheduler", 
zap.Error(err))
+               }
 
                signalChan := make(chan os.Signal, 1)
                signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
diff --git a/pkg/plugin/scheduler_plugin.go b/pkg/plugin/scheduler_plugin.go
index 4201c6f1..507c8ef9 100644
--- a/pkg/plugin/scheduler_plugin.go
+++ b/pkg/plugin/scheduler_plugin.go
@@ -271,7 +271,9 @@ func NewSchedulerPlugin(_ runtime.Object, handle 
framework.Handle) (framework.Pl
        // we need our own informer factory here because the informers we get 
from the framework handle aren't yet initialized
        informerFactory := 
informers.NewSharedInformerFactory(handle.ClientSet(), 0)
        ss := shim.NewShimSchedulerForPlugin(serviceContext.RMProxy, 
informerFactory, conf.GetSchedulerConf(), configMaps)
-       ss.Run()
+       if err := ss.Run(); err != nil {
+               log.Log(log.ShimSchedulerPlugin).Fatal("Unable to start 
scheduler", zap.Error(err))
+       }
 
        p := &YuniKornSchedulerPlugin{
                context: ss.GetContext(),
diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go
index f0620b3b..9916eaae 100644
--- a/pkg/shim/scheduler.go
+++ b/pkg/shim/scheduler.go
@@ -19,12 +19,9 @@
 package shim
 
 import (
-       "context"
-       "os"
        "sync"
        "time"
 
-       "github.com/looplab/fsm"
        "go.uber.org/zap"
        v1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/util/wait"
@@ -35,7 +32,6 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/cache"
        "github.com/apache/yunikorn-k8shim/pkg/callback"
        "github.com/apache/yunikorn-k8shim/pkg/client"
-       "github.com/apache/yunikorn-k8shim/pkg/common/events"
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
@@ -51,7 +47,6 @@ type KubernetesShim struct {
        appManager           *appmgmt.AppManagementService
        phManager            *cache.PlaceholderManager
        callback             api.ResourceManagerCallback
-       stateMachine         *fsm.FSM
        stopChan             chan struct{}
        lock                 *sync.RWMutex
        outstandingAppsFound bool
@@ -96,13 +91,11 @@ func newShimSchedulerInternal(ctx *cache.Context, 
apiFactory client.APIProvider,
                stopChan:             make(chan struct{}),
                lock:                 &sync.RWMutex{},
                outstandingAppsFound: false,
-               stateMachine:         newSchedulerState(),
        }
        // init dispatcher
        dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, 
ctx.ApplicationEventHandler())
        dispatcher.RegisterEventHandler(dispatcher.EventTypeTask, 
ctx.TaskEventHandler())
        dispatcher.RegisterEventHandler(dispatcher.EventTypeNode, 
ctx.SchedulerNodeEventHandler())
-       dispatcher.RegisterEventHandler(dispatcher.EventTypeScheduler, 
ss.SchedulerEventHandler())
 
        return ss
 }
@@ -111,89 +104,37 @@ func (ss *KubernetesShim) GetContext() *cache.Context {
        return ss.context
 }
 
-func (ss *KubernetesShim) SchedulerEventHandler() func(obj interface{}) {
-       return func(obj interface{}) {
-               if event, ok := obj.(events.SchedulerEvent); ok {
-                       if ss.canHandle(event) {
-                               if err := ss.handle(event); err != nil {
-                                       
log.Log(log.ShimScheduler).Error("failed to handle scheduler event",
-                                               zap.String("event", 
event.GetEvent()),
-                                               zap.Error(err))
-                               }
-                       }
-               }
+func (ss *KubernetesShim) recoverSchedulerState() error {
+       log.Log(log.ShimScheduler).Info("recovering scheduler states")
+       // step 1: recover all applications
+       // this step, we collect all the existing allocated pods from 
api-server,
+       // identify the scheduling identity (aka applicationInfo) from the pod,
+       // and then add these applications to the scheduler.
+       if err := ss.appManager.WaitForRecovery(); err != nil {
+               // failed
+               log.Log(log.ShimScheduler).Error("scheduler recovery failed", 
zap.Error(err))
+               return err
        }
-}
 
-func (ss *KubernetesShim) register() {
-       if err := ss.registerShimLayer(); err != nil {
-               dispatcher.Dispatch(ShimSchedulerEvent{
-                       event: RegisterSchedulerFailed,
-               })
-       } else {
-               dispatcher.Dispatch(ShimSchedulerEvent{
-                       event: RegisterSchedulerSucceed,
-               })
+       // step 2: recover existing allocations
+       // this step, we collect all existing allocations (allocated pods) from 
api-server,
+       // rerun the scheduling for these allocations in order to restore 
scheduler-state,
+       // the rerun is like a replay, not a actual scheduling procedure.
+       recoverableAppManagers := make([]interfaces.Recoverable, 0)
+       for _, appMgr := range ss.appManager.GetAllManagers() {
+               if m, ok := appMgr.(interfaces.Recoverable); ok {
+                       recoverableAppManagers = append(recoverableAppManagers, 
m)
+               }
        }
-}
-
-func (ss *KubernetesShim) handleSchedulerFailure() {
-       ss.Stop()
-       // testmode will be true when mock scheduler intailize
-       if !conf.GetSchedulerConf().IsTestMode() {
-               os.Exit(1)
+       if err := ss.context.WaitForRecovery(recoverableAppManagers, 
5*time.Minute); err != nil {
+               // failed
+               log.Log(log.ShimScheduler).Error("scheduler recovery failed", 
zap.Error(err))
+               return err
        }
-}
-
-func (ss *KubernetesShim) triggerSchedulerStateRecovery() {
-       dispatcher.Dispatch(ShimSchedulerEvent{
-               event: RecoverScheduler,
-       })
-}
 
-func (ss *KubernetesShim) recoverSchedulerState() {
-       // run recovery process in a go routine
-       // do not block main thread
-       go func() {
-               log.Log(log.ShimScheduler).Info("recovering scheduler states")
-               // step 1: recover all applications
-               // this step, we collect all the existing allocated pods from 
api-server,
-               // identify the scheduling identity (aka applicationInfo) from 
the pod,
-               // and then add these applications to the scheduler.
-               if err := ss.appManager.WaitForRecovery(); err != nil {
-                       // failed
-                       log.Log(log.ShimScheduler).Error("scheduler recovery 
failed", zap.Error(err))
-                       dispatcher.Dispatch(ShimSchedulerEvent{
-                               event: RecoverSchedulerFailed,
-                       })
-                       return
-               }
-
-               // step 2: recover existing allocations
-               // this step, we collect all existing allocations (allocated 
pods) from api-server,
-               // rerun the scheduling for these allocations in order to 
restore scheduler-state,
-               // the rerun is like a replay, not a actual scheduling 
procedure.
-               recoverableAppManagers := make([]interfaces.Recoverable, 0)
-               for _, appMgr := range ss.appManager.GetAllManagers() {
-                       if m, ok := appMgr.(interfaces.Recoverable); ok {
-                               recoverableAppManagers = 
append(recoverableAppManagers, m)
-                       }
-               }
-               if err := ss.context.WaitForRecovery(recoverableAppManagers, 
5*time.Minute); err != nil {
-                       // failed
-                       log.Log(log.ShimScheduler).Error("scheduler recovery 
failed", zap.Error(err))
-                       dispatcher.Dispatch(ShimSchedulerEvent{
-                               event: RecoverSchedulerFailed,
-                       })
-                       return
-               }
-
-               // success
-               log.Log(log.ShimScheduler).Info("scheduler recovery succeed")
-               dispatcher.Dispatch(ShimSchedulerEvent{
-                       event: RecoverSchedulerSucceed,
-               })
-       }()
+       // success
+       log.Log(log.ShimScheduler).Info("scheduler recovery succeed")
+       return nil
 }
 
 func (ss *KubernetesShim) doScheduling() {
@@ -243,27 +184,6 @@ func (ss *KubernetesShim) registerShimLayer() error {
        return nil
 }
 
-func (ss *KubernetesShim) GetSchedulerState() string {
-       return ss.stateMachine.Current()
-}
-
-// event handling
-func (ss *KubernetesShim) handle(se events.SchedulerEvent) error {
-       ss.lock.Lock()
-       defer ss.lock.Unlock()
-       err := ss.stateMachine.Event(context.Background(), se.GetEvent(), ss)
-       if err != nil && err.Error() == "no transition" {
-               return err
-       }
-       return nil
-}
-
-func (ss *KubernetesShim) canHandle(se events.SchedulerEvent) bool {
-       ss.lock.RLock()
-       defer ss.lock.RUnlock()
-       return ss.stateMachine.Can(se.GetEvent())
-}
-
 // each schedule iteration, we scan all apps and triggers app state transition
 func (ss *KubernetesShim) schedule() {
        apps := ss.context.GetAllApplications()
@@ -274,7 +194,7 @@ func (ss *KubernetesShim) schedule() {
        }
 }
 
-func (ss *KubernetesShim) Run() {
+func (ss *KubernetesShim) Run() error {
        // NOTE: the order of starting these services matter,
        // please look at the comments before modifying the orders
 
@@ -289,19 +209,33 @@ func (ss *KubernetesShim) Run() {
        // run the client library code that communicates with Kubernetes
        ss.apiFactory.Start()
 
-       // register scheduler with scheduler core
-       // this triggers the scheduler state transition
-       // it first registers with the core, then start to do recovery,
-       // after the recovery is succeed, it goes to the normal scheduling 
routine
-       dispatcher.Dispatch(newRegisterSchedulerEvent())
+       // register shim with core
+       if err := ss.registerShimLayer(); err != nil {
+               log.Log(log.ShimScheduler).Error("failed to register shim with 
core", zap.Error(err))
+               ss.Stop()
+               return err
+       }
 
        // run app managers
        // the app manager launches the pod event handlers
        // it needs to be started after the shim is registered with the core
        if err := ss.appManager.Start(); err != nil {
-               log.Log(log.ShimScheduler).Fatal("failed to start app manager", 
zap.Error(err))
+               log.Log(log.ShimScheduler).Error("failed to start app manager", 
zap.Error(err))
+               ss.Stop()
+               return err
+       }
+
+       // recover scheduler state
+       if err := ss.recoverSchedulerState(); err != nil {
+               log.Log(log.ShimScheduler).Error("failed to recover scheduler 
state", zap.Error(err))
                ss.Stop()
+               return err
        }
+
+       // start scheduling loop
+       ss.doScheduling()
+
+       return nil
 }
 
 func (ss *KubernetesShim) Stop() {
diff --git a/pkg/shim/scheduler_graphviz_test.go 
b/pkg/shim/scheduler_graphviz_test.go
deleted file mode 100644
index 7f2da073..00000000
--- a/pkg/shim/scheduler_graphviz_test.go
+++ /dev/null
@@ -1,61 +0,0 @@
-//go:build graphviz
-// +build graphviz
-
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package shim
-
-import (
-       "fmt"
-       "os"
-       "testing"
-
-       "github.com/looplab/fsm"
-       "gotest.tools/v3/assert"
-
-       "github.com/apache/yunikorn-k8shim/pkg/appmgmt"
-       "github.com/apache/yunikorn-k8shim/pkg/cache"
-       "github.com/apache/yunikorn-k8shim/pkg/client"
-       "github.com/apache/yunikorn-k8shim/pkg/common/test"
-       "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
-       "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
-)
-
-func TestSchedulerFsmGraph(t *testing.T) {
-       var callback api.ResourceManagerCallback
-
-       mockedAMProtocol := cache.NewMockedAMProtocol()
-       mockedAPIProvider := client.NewMockedAPIProvider(false)
-       mockedAPIProvider.GetAPIs().SchedulerAPI = 
test.NewSchedulerAPIMock().RegisterFunction(
-               func(request *si.RegisterResourceManagerRequest,
-                       callback api.ResourceManagerCallback) (response 
*si.RegisterResourceManagerResponse, e error) {
-                       return nil, fmt.Errorf("some error")
-               })
-
-       ctx := cache.NewContext(mockedAPIProvider)
-       shim := newShimSchedulerInternal(ctx, mockedAPIProvider,
-               appmgmt.NewAMService(mockedAMProtocol, mockedAPIProvider), 
callback)
-
-       graph := fsm.Visualize(shim.stateMachine)
-
-       err := os.MkdirAll("../../build/fsm", 0755)
-       assert.NilError(t, err, "Creating output dir failed")
-       os.WriteFile("../../build/fsm/k8shim-scheduler-state.dot", 
[]byte(graph), 0644)
-       assert.NilError(t, err, "Writing graph failed")
-}
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index f430bdc3..51e9cde1 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -81,10 +81,14 @@ func (fc *MockScheduler) init() {
        fc.apiProvider = mockedAPIProvider
 }
 
-func (fc *MockScheduler) start() {
+func (fc *MockScheduler) start() error {
        fc.apiProvider.RunEventHandler() // must be called first
-       fc.scheduler.Run()
+       if err := fc.scheduler.Run(); err != nil {
+               fc.started.Store(false)
+               return err
+       }
        fc.started.Store(true)
+       return nil
 }
 
 func (fc *MockScheduler) updateConfig(queues string, extraConfig 
map[string]string) error {
@@ -172,23 +176,6 @@ func (fc *MockScheduler) addTask(appID string, taskID 
string, ask *si.Resource)
        })
 }
 
-func (fc *MockScheduler) waitForSchedulerState(t testing.TB, expectedState 
string) {
-       deadline := time.Now().Add(10 * time.Second)
-       for {
-               if fc.scheduler.GetSchedulerState() == expectedState {
-                       break
-               }
-               log.Log(log.Test).Info("waiting for scheduler state",
-                       zap.String("expected", expectedState),
-                       zap.String("actual", fc.scheduler.GetSchedulerState()))
-               time.Sleep(time.Second)
-               if time.Now().After(deadline) {
-                       t.Errorf("wait for scheduler to reach state %s failed, 
current state %s",
-                               expectedState, fc.scheduler.GetSchedulerState())
-               }
-       }
-}
-
 func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, 
expectedState string) {
        app := fc.context.GetApplication(appID)
        assert.Equal(t, app != nil, true)
diff --git a/pkg/shim/scheduler_perf_test.go b/pkg/shim/scheduler_perf_test.go
index 53c057bb..9c1afcae 100644
--- a/pkg/shim/scheduler_perf_test.go
+++ b/pkg/shim/scheduler_perf_test.go
@@ -80,7 +80,7 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
 
        cluster := &MockScheduler{}
        cluster.init()
-       cluster.start()
+       assert.NilError(b, cluster.start(), "failed to initialize cluster")
        defer cluster.stop()
 
        if profileCpu {
@@ -101,8 +101,7 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
                }(f)
        }
 
-       // init scheduler & update config
-       cluster.waitForSchedulerState(b, SchedulerStates().Running)
+       // update config
        err := cluster.updateConfig(queueConfig, map[string]string{
                "log.level": "WARN",
        })
diff --git a/pkg/shim/scheduler_state.go b/pkg/shim/scheduler_state.go
deleted file mode 100644
index dcce2735..00000000
--- a/pkg/shim/scheduler_state.go
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package shim
-
-import (
-       "context"
-       "sync"
-
-       "github.com/looplab/fsm"
-       "go.uber.org/zap"
-
-       "github.com/apache/yunikorn-k8shim/pkg/common/events"
-       "github.com/apache/yunikorn-k8shim/pkg/log"
-)
-
-var schedulerStatesOnce sync.Once
-
-// ----------------------------------------------
-// Scheduler events
-// ----------------------------------------------
-type SchedulerEventType int
-
-const (
-       RegisterScheduler SchedulerEventType = iota
-       RegisterSchedulerSucceed
-       RegisterSchedulerFailed
-       RecoverScheduler
-       RecoverSchedulerSucceed
-       RecoverSchedulerFailed
-)
-
-func (ae SchedulerEventType) String() string {
-       return [...]string{"RegisterScheduler", "RegisterSchedulerSucceed", 
"RegisterSchedulerFailed", "RecoverScheduler", "RecoverSchedulerSucceed", 
"RecoverSchedulerFailed"}[ae]
-}
-
-type ShimSchedulerEvent struct { //nolint:golint
-       event SchedulerEventType
-}
-
-func (rs ShimSchedulerEvent) GetEvent() string {
-       return rs.event.String()
-}
-
-func (rs ShimSchedulerEvent) GetArgs() []interface{} {
-       return nil
-}
-
-// -------------------------------------------------------------------
-// event to trigger scheduler registration
-// --------------------------------------------------------------------
-type RegisterSchedulerEvent struct {
-       event SchedulerEventType
-}
-
-func newRegisterSchedulerEvent() RegisterSchedulerEvent {
-       return RegisterSchedulerEvent{
-               event: RegisterScheduler,
-       }
-}
-
-func (rs RegisterSchedulerEvent) GetEvent() string {
-       return rs.event.String()
-}
-
-func (rs RegisterSchedulerEvent) GetArgs() []interface{} {
-       return nil
-}
-
-// ----------------------------------
-// Scheduler states
-// ----------------------------------
-var storeScheduleStates *SStates
-
-type SStates struct {
-       New         string
-       Registered  string
-       Registering string
-       Recovering  string
-       Running     string
-       Draining    string
-       Stopped     string
-}
-
-func SchedulerStates() *SStates {
-       schedulerStatesOnce.Do(func() {
-               storeScheduleStates = &SStates{
-                       New:         "New",
-                       Registered:  "Registered",
-                       Registering: "Registering",
-                       Recovering:  "Recovering",
-                       Running:     "Running",
-                       Draining:    "Draining",
-                       Stopped:     "Stopped",
-               }
-       })
-       return storeScheduleStates
-}
-
-func newSchedulerState() *fsm.FSM {
-       states := SchedulerStates()
-       return fsm.NewFSM(
-               states.New, fsm.Events{
-                       {
-                               Name: RegisterScheduler.String(),
-                               Src:  []string{states.New},
-                               Dst:  states.Registering,
-                       },
-                       {
-                               Name: RegisterSchedulerSucceed.String(),
-                               Src:  []string{states.Registering},
-                               Dst:  states.Registered,
-                       },
-                       {
-                               Name: RegisterSchedulerFailed.String(),
-                               Src:  []string{states.Registering},
-                               Dst:  states.Stopped,
-                       },
-                       {
-                               Name: RecoverScheduler.String(),
-                               Src:  []string{states.Registered},
-                               Dst:  states.Recovering,
-                       },
-                       {
-                               Name: RecoverSchedulerSucceed.String(),
-                               Src:  []string{states.Recovering},
-                               Dst:  states.Running,
-                       },
-                       {
-                               Name: RecoverSchedulerFailed.String(),
-                               Src:  []string{states.Recovering},
-                               Dst:  states.Stopped,
-                       },
-               },
-               fsm.Callbacks{
-                       events.EnterState: func(_ context.Context, event 
*fsm.Event) {
-                               log.Log(log.ShimFSM).Debug("scheduler shim 
state transition",
-                                       zap.String("source", event.Src),
-                                       zap.String("destination", event.Dst),
-                                       zap.String("event", event.Event))
-                       },
-                       states.Registered: func(_ context.Context, event 
*fsm.Event) {
-                               scheduler := event.Args[0].(*KubernetesShim) 
//nolint:errcheck
-                               scheduler.triggerSchedulerStateRecovery()    // 
if reaches registered, trigger recovering
-                       },
-                       states.Recovering: func(_ context.Context, event 
*fsm.Event) {
-                               scheduler := event.Args[0].(*KubernetesShim) 
//nolint:errcheck
-                               scheduler.recoverSchedulerState()            // 
do recovering
-                       },
-                       states.Running: func(_ context.Context, event 
*fsm.Event) {
-                               scheduler := event.Args[0].(*KubernetesShim) 
//nolint:errcheck
-                               scheduler.doScheduling()                     // 
do scheduling
-                       },
-                       RegisterScheduler.String(): func(_ context.Context, 
event *fsm.Event) {
-                               scheduler := event.Args[0].(*KubernetesShim) 
//nolint:errcheck
-                               scheduler.register()                         // 
trigger registration
-                       },
-                       RegisterSchedulerFailed.String(): func(_ 
context.Context, event *fsm.Event) {
-                               scheduler := event.Args[0].(*KubernetesShim) 
//nolint:errcheck
-                               scheduler.handleSchedulerFailure()           // 
registration failed, stop the scheduler
-                       },
-                       RecoverSchedulerFailed.String(): func(_ 
context.Context, event *fsm.Event) {
-                               scheduler := event.Args[0].(*KubernetesShim) 
//nolint:errcheck
-                               scheduler.handleSchedulerFailure()           // 
recovery failed
-                       },
-               },
-       )
-}
diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go
index 4f563465..f8387c6a 100644
--- a/pkg/shim/scheduler_test.go
+++ b/pkg/shim/scheduler_test.go
@@ -21,9 +21,7 @@ package shim
 import (
        "fmt"
        "testing"
-       "time"
 
-       "go.uber.org/zap"
        "gotest.tools/v3/assert"
        v1 "k8s.io/api/core/v1"
 
@@ -32,7 +30,6 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/client"
        "github.com/apache/yunikorn-k8shim/pkg/common"
        "github.com/apache/yunikorn-k8shim/pkg/common/test"
-       "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
        siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -58,11 +55,9 @@ partitions:
        // init and register scheduler
        cluster := MockScheduler{}
        cluster.init()
-       cluster.start()
+       assert.NilError(t, cluster.start(), "failed to start cluster")
        defer cluster.stop()
 
-       // ensure scheduler running
-       cluster.waitForSchedulerState(t, SchedulerStates().Running)
        err := cluster.updateConfig(configData, nil)
        assert.NilError(t, err, "update config failed")
        nodeLabels := map[string]string{
@@ -112,11 +107,9 @@ partitions:
        // init and register scheduler
        cluster := MockScheduler{}
        cluster.init()
-       cluster.start()
+       assert.NilError(t, cluster.start(), "failed to start cluster")
        defer cluster.stop()
 
-       // ensure scheduler state
-       cluster.waitForSchedulerState(t, SchedulerStates().Running)
        err := cluster.updateConfig(configData, nil)
        assert.NilError(t, err, "update config failed")
 
@@ -173,11 +166,8 @@ func TestSchedulerRegistrationFailed(t *testing.T) {
        ctx := cache.NewContext(mockedAPIProvider)
        shim := newShimSchedulerInternal(ctx, mockedAPIProvider,
                appmgmt.NewAMService(mockedAMProtocol, mockedAPIProvider), 
callback)
-       shim.Run()
-       defer shim.Stop()
-
-       err := waitShimSchedulerState(shim, SchedulerStates().Stopped, 
5*time.Second)
-       assert.NilError(t, err)
+       assert.Error(t, shim.Run(), "some error")
+       shim.Stop()
 }
 
 func TestTaskFailures(t *testing.T) {
@@ -203,7 +193,7 @@ partitions:
        // init and register scheduler
        cluster := MockScheduler{}
        cluster.init()
-       cluster.start()
+       assert.NilError(t, cluster.start(), "failed to start cluster")
        defer cluster.stop()
 
        // mock pod bind failures
@@ -214,8 +204,6 @@ partitions:
                return nil
        })
 
-       // ensure scheduler state
-       cluster.waitForSchedulerState(t, SchedulerStates().Running)
        err := cluster.updateConfig(configData, nil)
        assert.NilError(t, err, "update config failed")
 
@@ -248,20 +236,3 @@ partitions:
                "[mycluster]default", "app0001", 1)
        assert.NilError(t, err, "number of allocations is not expected, error")
 }
-
-func waitShimSchedulerState(shim *KubernetesShim, expectedState string, 
timeout time.Duration) error {
-       deadline := time.Now().Add(timeout)
-       for {
-               if shim.GetSchedulerState() == expectedState {
-                       log.Log(log.Test).Info("waiting for state",
-                               zap.String("expect", expectedState),
-                               zap.String("current", shim.GetSchedulerState()))
-                       return nil
-               }
-               time.Sleep(1 * time.Second)
-               if time.Now().After(deadline) {
-                       return fmt.Errorf("scheduler has not reached expected 
state %s in %d seconds, current state: %s",
-                               expectedState, deadline.Second(), 
shim.GetSchedulerState())
-               }
-       }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to