This is an automated email from the ASF dual-hosted git repository.
wilfreds pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 9487666c [YUNIKORN-2074] Remove scheduler state machine (#700)
9487666c is described below
commit 9487666c52415a79ac984747846762677b41a7f7
Author: Craig Condit <[email protected]>
AuthorDate: Thu Oct 26 13:09:37 2023 +1100
[YUNIKORN-2074] Remove scheduler state machine (#700)
Shim Scheduler startup does not need a state machine as the order is well
defined. A failure at any point in the startup sequence will cause a
failure of the whole scheduler.
Closes: #700
Signed-off-by: Wilfred Spiegelenburg <[email protected]>
---
Makefile | 2 +-
pkg/cmd/shim/main.go | 4 +-
pkg/plugin/scheduler_plugin.go | 4 +-
pkg/shim/scheduler.go | 160 +++++++++----------------------
pkg/shim/scheduler_graphviz_test.go | 61 ------------
pkg/shim/scheduler_mock_test.go | 25 ++---
pkg/shim/scheduler_perf_test.go | 5 +-
pkg/shim/scheduler_state.go | 183 ------------------------------------
pkg/shim/scheduler_test.go | 39 +-------
9 files changed, 67 insertions(+), 416 deletions(-)
diff --git a/Makefile b/Makefile
index b2d06ed3..a2e0d589 100644
--- a/Makefile
+++ b/Makefile
@@ -510,7 +510,7 @@ bench:
fsm_graph:
@echo "generating FSM graphs"
"$(GO)" clean -testcache
- "$(GO)" test -tags graphviz -run 'Test.*FsmGraph' ./pkg/shim ./pkg/cache
+ "$(GO)" test -tags graphviz -run 'Test.*FsmGraph' ./pkg/cache
scripts/generate-fsm-graph-images.sh
# Remove generated build artifacts
diff --git a/pkg/cmd/shim/main.go b/pkg/cmd/shim/main.go
index 9f240670..cc6785df 100644
--- a/pkg/cmd/shim/main.go
+++ b/pkg/cmd/shim/main.go
@@ -51,7 +51,9 @@ func main() {
if serviceContext.RMProxy != nil {
ss := shim.NewShimScheduler(serviceContext.RMProxy,
conf.GetSchedulerConf(), configMaps)
- ss.Run()
+ if err := ss.Run(); err != nil {
+ log.Log(log.Shim).Fatal("Unable tto start scheduler",
zap.Error(err))
+ }
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
diff --git a/pkg/plugin/scheduler_plugin.go b/pkg/plugin/scheduler_plugin.go
index 4201c6f1..507c8ef9 100644
--- a/pkg/plugin/scheduler_plugin.go
+++ b/pkg/plugin/scheduler_plugin.go
@@ -271,7 +271,9 @@ func NewSchedulerPlugin(_ runtime.Object, handle
framework.Handle) (framework.Pl
// we need our own informer factory here because the informers we get
from the framework handle aren't yet initialized
informerFactory :=
informers.NewSharedInformerFactory(handle.ClientSet(), 0)
ss := shim.NewShimSchedulerForPlugin(serviceContext.RMProxy,
informerFactory, conf.GetSchedulerConf(), configMaps)
- ss.Run()
+ if err := ss.Run(); err != nil {
+ log.Log(log.ShimSchedulerPlugin).Fatal("Unable to start
scheduler", zap.Error(err))
+ }
p := &YuniKornSchedulerPlugin{
context: ss.GetContext(),
diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go
index f0620b3b..9916eaae 100644
--- a/pkg/shim/scheduler.go
+++ b/pkg/shim/scheduler.go
@@ -19,12 +19,9 @@
package shim
import (
- "context"
- "os"
"sync"
"time"
- "github.com/looplab/fsm"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
@@ -35,7 +32,6 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/callback"
"github.com/apache/yunikorn-k8shim/pkg/client"
- "github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
@@ -51,7 +47,6 @@ type KubernetesShim struct {
appManager *appmgmt.AppManagementService
phManager *cache.PlaceholderManager
callback api.ResourceManagerCallback
- stateMachine *fsm.FSM
stopChan chan struct{}
lock *sync.RWMutex
outstandingAppsFound bool
@@ -96,13 +91,11 @@ func newShimSchedulerInternal(ctx *cache.Context,
apiFactory client.APIProvider,
stopChan: make(chan struct{}),
lock: &sync.RWMutex{},
outstandingAppsFound: false,
- stateMachine: newSchedulerState(),
}
// init dispatcher
dispatcher.RegisterEventHandler(dispatcher.EventTypeApp,
ctx.ApplicationEventHandler())
dispatcher.RegisterEventHandler(dispatcher.EventTypeTask,
ctx.TaskEventHandler())
dispatcher.RegisterEventHandler(dispatcher.EventTypeNode,
ctx.SchedulerNodeEventHandler())
- dispatcher.RegisterEventHandler(dispatcher.EventTypeScheduler,
ss.SchedulerEventHandler())
return ss
}
@@ -111,89 +104,37 @@ func (ss *KubernetesShim) GetContext() *cache.Context {
return ss.context
}
-func (ss *KubernetesShim) SchedulerEventHandler() func(obj interface{}) {
- return func(obj interface{}) {
- if event, ok := obj.(events.SchedulerEvent); ok {
- if ss.canHandle(event) {
- if err := ss.handle(event); err != nil {
-
log.Log(log.ShimScheduler).Error("failed to handle scheduler event",
- zap.String("event",
event.GetEvent()),
- zap.Error(err))
- }
- }
- }
+func (ss *KubernetesShim) recoverSchedulerState() error {
+ log.Log(log.ShimScheduler).Info("recovering scheduler states")
+ // step 1: recover all applications
+ // this step, we collect all the existing allocated pods from
api-server,
+ // identify the scheduling identity (aka applicationInfo) from the pod,
+ // and then add these applications to the scheduler.
+ if err := ss.appManager.WaitForRecovery(); err != nil {
+ // failed
+ log.Log(log.ShimScheduler).Error("scheduler recovery failed",
zap.Error(err))
+ return err
}
-}
-func (ss *KubernetesShim) register() {
- if err := ss.registerShimLayer(); err != nil {
- dispatcher.Dispatch(ShimSchedulerEvent{
- event: RegisterSchedulerFailed,
- })
- } else {
- dispatcher.Dispatch(ShimSchedulerEvent{
- event: RegisterSchedulerSucceed,
- })
+ // step 2: recover existing allocations
+ // this step, we collect all existing allocations (allocated pods) from
api-server,
+ // rerun the scheduling for these allocations in order to restore
scheduler-state,
+ // the rerun is like a replay, not a actual scheduling procedure.
+ recoverableAppManagers := make([]interfaces.Recoverable, 0)
+ for _, appMgr := range ss.appManager.GetAllManagers() {
+ if m, ok := appMgr.(interfaces.Recoverable); ok {
+ recoverableAppManagers = append(recoverableAppManagers,
m)
+ }
}
-}
-
-func (ss *KubernetesShim) handleSchedulerFailure() {
- ss.Stop()
- // testmode will be true when mock scheduler intailize
- if !conf.GetSchedulerConf().IsTestMode() {
- os.Exit(1)
+ if err := ss.context.WaitForRecovery(recoverableAppManagers,
5*time.Minute); err != nil {
+ // failed
+ log.Log(log.ShimScheduler).Error("scheduler recovery failed",
zap.Error(err))
+ return err
}
-}
-
-func (ss *KubernetesShim) triggerSchedulerStateRecovery() {
- dispatcher.Dispatch(ShimSchedulerEvent{
- event: RecoverScheduler,
- })
-}
-func (ss *KubernetesShim) recoverSchedulerState() {
- // run recovery process in a go routine
- // do not block main thread
- go func() {
- log.Log(log.ShimScheduler).Info("recovering scheduler states")
- // step 1: recover all applications
- // this step, we collect all the existing allocated pods from
api-server,
- // identify the scheduling identity (aka applicationInfo) from
the pod,
- // and then add these applications to the scheduler.
- if err := ss.appManager.WaitForRecovery(); err != nil {
- // failed
- log.Log(log.ShimScheduler).Error("scheduler recovery
failed", zap.Error(err))
- dispatcher.Dispatch(ShimSchedulerEvent{
- event: RecoverSchedulerFailed,
- })
- return
- }
-
- // step 2: recover existing allocations
- // this step, we collect all existing allocations (allocated
pods) from api-server,
- // rerun the scheduling for these allocations in order to
restore scheduler-state,
- // the rerun is like a replay, not a actual scheduling
procedure.
- recoverableAppManagers := make([]interfaces.Recoverable, 0)
- for _, appMgr := range ss.appManager.GetAllManagers() {
- if m, ok := appMgr.(interfaces.Recoverable); ok {
- recoverableAppManagers =
append(recoverableAppManagers, m)
- }
- }
- if err := ss.context.WaitForRecovery(recoverableAppManagers,
5*time.Minute); err != nil {
- // failed
- log.Log(log.ShimScheduler).Error("scheduler recovery
failed", zap.Error(err))
- dispatcher.Dispatch(ShimSchedulerEvent{
- event: RecoverSchedulerFailed,
- })
- return
- }
-
- // success
- log.Log(log.ShimScheduler).Info("scheduler recovery succeed")
- dispatcher.Dispatch(ShimSchedulerEvent{
- event: RecoverSchedulerSucceed,
- })
- }()
+ // success
+ log.Log(log.ShimScheduler).Info("scheduler recovery succeed")
+ return nil
}
func (ss *KubernetesShim) doScheduling() {
@@ -243,27 +184,6 @@ func (ss *KubernetesShim) registerShimLayer() error {
return nil
}
-func (ss *KubernetesShim) GetSchedulerState() string {
- return ss.stateMachine.Current()
-}
-
-// event handling
-func (ss *KubernetesShim) handle(se events.SchedulerEvent) error {
- ss.lock.Lock()
- defer ss.lock.Unlock()
- err := ss.stateMachine.Event(context.Background(), se.GetEvent(), ss)
- if err != nil && err.Error() == "no transition" {
- return err
- }
- return nil
-}
-
-func (ss *KubernetesShim) canHandle(se events.SchedulerEvent) bool {
- ss.lock.RLock()
- defer ss.lock.RUnlock()
- return ss.stateMachine.Can(se.GetEvent())
-}
-
// each schedule iteration, we scan all apps and triggers app state transition
func (ss *KubernetesShim) schedule() {
apps := ss.context.GetAllApplications()
@@ -274,7 +194,7 @@ func (ss *KubernetesShim) schedule() {
}
}
-func (ss *KubernetesShim) Run() {
+func (ss *KubernetesShim) Run() error {
// NOTE: the order of starting these services matter,
// please look at the comments before modifying the orders
@@ -289,19 +209,33 @@ func (ss *KubernetesShim) Run() {
// run the client library code that communicates with Kubernetes
ss.apiFactory.Start()
- // register scheduler with scheduler core
- // this triggers the scheduler state transition
- // it first registers with the core, then start to do recovery,
- // after the recovery is succeed, it goes to the normal scheduling
routine
- dispatcher.Dispatch(newRegisterSchedulerEvent())
+ // register shim with core
+ if err := ss.registerShimLayer(); err != nil {
+ log.Log(log.ShimScheduler).Error("failed to register shim with
core", zap.Error(err))
+ ss.Stop()
+ return err
+ }
// run app managers
// the app manager launches the pod event handlers
// it needs to be started after the shim is registered with the core
if err := ss.appManager.Start(); err != nil {
- log.Log(log.ShimScheduler).Fatal("failed to start app manager",
zap.Error(err))
+ log.Log(log.ShimScheduler).Error("failed to start app manager",
zap.Error(err))
+ ss.Stop()
+ return err
+ }
+
+ // recover scheduler state
+ if err := ss.recoverSchedulerState(); err != nil {
+ log.Log(log.ShimScheduler).Error("failed to recover scheduler
state", zap.Error(err))
ss.Stop()
+ return err
}
+
+ // start scheduling loop
+ ss.doScheduling()
+
+ return nil
}
func (ss *KubernetesShim) Stop() {
diff --git a/pkg/shim/scheduler_graphviz_test.go
b/pkg/shim/scheduler_graphviz_test.go
deleted file mode 100644
index 7f2da073..00000000
--- a/pkg/shim/scheduler_graphviz_test.go
+++ /dev/null
@@ -1,61 +0,0 @@
-//go:build graphviz
-// +build graphviz
-
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package shim
-
-import (
- "fmt"
- "os"
- "testing"
-
- "github.com/looplab/fsm"
- "gotest.tools/v3/assert"
-
- "github.com/apache/yunikorn-k8shim/pkg/appmgmt"
- "github.com/apache/yunikorn-k8shim/pkg/cache"
- "github.com/apache/yunikorn-k8shim/pkg/client"
- "github.com/apache/yunikorn-k8shim/pkg/common/test"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
-)
-
-func TestSchedulerFsmGraph(t *testing.T) {
- var callback api.ResourceManagerCallback
-
- mockedAMProtocol := cache.NewMockedAMProtocol()
- mockedAPIProvider := client.NewMockedAPIProvider(false)
- mockedAPIProvider.GetAPIs().SchedulerAPI =
test.NewSchedulerAPIMock().RegisterFunction(
- func(request *si.RegisterResourceManagerRequest,
- callback api.ResourceManagerCallback) (response
*si.RegisterResourceManagerResponse, e error) {
- return nil, fmt.Errorf("some error")
- })
-
- ctx := cache.NewContext(mockedAPIProvider)
- shim := newShimSchedulerInternal(ctx, mockedAPIProvider,
- appmgmt.NewAMService(mockedAMProtocol, mockedAPIProvider),
callback)
-
- graph := fsm.Visualize(shim.stateMachine)
-
- err := os.MkdirAll("../../build/fsm", 0755)
- assert.NilError(t, err, "Creating output dir failed")
- os.WriteFile("../../build/fsm/k8shim-scheduler-state.dot",
[]byte(graph), 0644)
- assert.NilError(t, err, "Writing graph failed")
-}
diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go
index f430bdc3..51e9cde1 100644
--- a/pkg/shim/scheduler_mock_test.go
+++ b/pkg/shim/scheduler_mock_test.go
@@ -81,10 +81,14 @@ func (fc *MockScheduler) init() {
fc.apiProvider = mockedAPIProvider
}
-func (fc *MockScheduler) start() {
+func (fc *MockScheduler) start() error {
fc.apiProvider.RunEventHandler() // must be called first
- fc.scheduler.Run()
+ if err := fc.scheduler.Run(); err != nil {
+ fc.started.Store(false)
+ return err
+ }
fc.started.Store(true)
+ return nil
}
func (fc *MockScheduler) updateConfig(queues string, extraConfig
map[string]string) error {
@@ -172,23 +176,6 @@ func (fc *MockScheduler) addTask(appID string, taskID
string, ask *si.Resource)
})
}
-func (fc *MockScheduler) waitForSchedulerState(t testing.TB, expectedState
string) {
- deadline := time.Now().Add(10 * time.Second)
- for {
- if fc.scheduler.GetSchedulerState() == expectedState {
- break
- }
- log.Log(log.Test).Info("waiting for scheduler state",
- zap.String("expected", expectedState),
- zap.String("actual", fc.scheduler.GetSchedulerState()))
- time.Sleep(time.Second)
- if time.Now().After(deadline) {
- t.Errorf("wait for scheduler to reach state %s failed,
current state %s",
- expectedState, fc.scheduler.GetSchedulerState())
- }
- }
-}
-
func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID,
expectedState string) {
app := fc.context.GetApplication(appID)
assert.Equal(t, app != nil, true)
diff --git a/pkg/shim/scheduler_perf_test.go b/pkg/shim/scheduler_perf_test.go
index 53c057bb..9c1afcae 100644
--- a/pkg/shim/scheduler_perf_test.go
+++ b/pkg/shim/scheduler_perf_test.go
@@ -80,7 +80,7 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
cluster := &MockScheduler{}
cluster.init()
- cluster.start()
+ assert.NilError(b, cluster.start(), "failed to initialize cluster")
defer cluster.stop()
if profileCpu {
@@ -101,8 +101,7 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
}(f)
}
- // init scheduler & update config
- cluster.waitForSchedulerState(b, SchedulerStates().Running)
+ // update config
err := cluster.updateConfig(queueConfig, map[string]string{
"log.level": "WARN",
})
diff --git a/pkg/shim/scheduler_state.go b/pkg/shim/scheduler_state.go
deleted file mode 100644
index dcce2735..00000000
--- a/pkg/shim/scheduler_state.go
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package shim
-
-import (
- "context"
- "sync"
-
- "github.com/looplab/fsm"
- "go.uber.org/zap"
-
- "github.com/apache/yunikorn-k8shim/pkg/common/events"
- "github.com/apache/yunikorn-k8shim/pkg/log"
-)
-
-var schedulerStatesOnce sync.Once
-
-// ----------------------------------------------
-// Scheduler events
-// ----------------------------------------------
-type SchedulerEventType int
-
-const (
- RegisterScheduler SchedulerEventType = iota
- RegisterSchedulerSucceed
- RegisterSchedulerFailed
- RecoverScheduler
- RecoverSchedulerSucceed
- RecoverSchedulerFailed
-)
-
-func (ae SchedulerEventType) String() string {
- return [...]string{"RegisterScheduler", "RegisterSchedulerSucceed",
"RegisterSchedulerFailed", "RecoverScheduler", "RecoverSchedulerSucceed",
"RecoverSchedulerFailed"}[ae]
-}
-
-type ShimSchedulerEvent struct { //nolint:golint
- event SchedulerEventType
-}
-
-func (rs ShimSchedulerEvent) GetEvent() string {
- return rs.event.String()
-}
-
-func (rs ShimSchedulerEvent) GetArgs() []interface{} {
- return nil
-}
-
-// -------------------------------------------------------------------
-// event to trigger scheduler registration
-// --------------------------------------------------------------------
-type RegisterSchedulerEvent struct {
- event SchedulerEventType
-}
-
-func newRegisterSchedulerEvent() RegisterSchedulerEvent {
- return RegisterSchedulerEvent{
- event: RegisterScheduler,
- }
-}
-
-func (rs RegisterSchedulerEvent) GetEvent() string {
- return rs.event.String()
-}
-
-func (rs RegisterSchedulerEvent) GetArgs() []interface{} {
- return nil
-}
-
-// ----------------------------------
-// Scheduler states
-// ----------------------------------
-var storeScheduleStates *SStates
-
-type SStates struct {
- New string
- Registered string
- Registering string
- Recovering string
- Running string
- Draining string
- Stopped string
-}
-
-func SchedulerStates() *SStates {
- schedulerStatesOnce.Do(func() {
- storeScheduleStates = &SStates{
- New: "New",
- Registered: "Registered",
- Registering: "Registering",
- Recovering: "Recovering",
- Running: "Running",
- Draining: "Draining",
- Stopped: "Stopped",
- }
- })
- return storeScheduleStates
-}
-
-func newSchedulerState() *fsm.FSM {
- states := SchedulerStates()
- return fsm.NewFSM(
- states.New, fsm.Events{
- {
- Name: RegisterScheduler.String(),
- Src: []string{states.New},
- Dst: states.Registering,
- },
- {
- Name: RegisterSchedulerSucceed.String(),
- Src: []string{states.Registering},
- Dst: states.Registered,
- },
- {
- Name: RegisterSchedulerFailed.String(),
- Src: []string{states.Registering},
- Dst: states.Stopped,
- },
- {
- Name: RecoverScheduler.String(),
- Src: []string{states.Registered},
- Dst: states.Recovering,
- },
- {
- Name: RecoverSchedulerSucceed.String(),
- Src: []string{states.Recovering},
- Dst: states.Running,
- },
- {
- Name: RecoverSchedulerFailed.String(),
- Src: []string{states.Recovering},
- Dst: states.Stopped,
- },
- },
- fsm.Callbacks{
- events.EnterState: func(_ context.Context, event
*fsm.Event) {
- log.Log(log.ShimFSM).Debug("scheduler shim
state transition",
- zap.String("source", event.Src),
- zap.String("destination", event.Dst),
- zap.String("event", event.Event))
- },
- states.Registered: func(_ context.Context, event
*fsm.Event) {
- scheduler := event.Args[0].(*KubernetesShim)
//nolint:errcheck
- scheduler.triggerSchedulerStateRecovery() //
if reaches registered, trigger recovering
- },
- states.Recovering: func(_ context.Context, event
*fsm.Event) {
- scheduler := event.Args[0].(*KubernetesShim)
//nolint:errcheck
- scheduler.recoverSchedulerState() //
do recovering
- },
- states.Running: func(_ context.Context, event
*fsm.Event) {
- scheduler := event.Args[0].(*KubernetesShim)
//nolint:errcheck
- scheduler.doScheduling() //
do scheduling
- },
- RegisterScheduler.String(): func(_ context.Context,
event *fsm.Event) {
- scheduler := event.Args[0].(*KubernetesShim)
//nolint:errcheck
- scheduler.register() //
trigger registration
- },
- RegisterSchedulerFailed.String(): func(_
context.Context, event *fsm.Event) {
- scheduler := event.Args[0].(*KubernetesShim)
//nolint:errcheck
- scheduler.handleSchedulerFailure() //
registration failed, stop the scheduler
- },
- RecoverSchedulerFailed.String(): func(_
context.Context, event *fsm.Event) {
- scheduler := event.Args[0].(*KubernetesShim)
//nolint:errcheck
- scheduler.handleSchedulerFailure() //
recovery failed
- },
- },
- )
-}
diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go
index 4f563465..f8387c6a 100644
--- a/pkg/shim/scheduler_test.go
+++ b/pkg/shim/scheduler_test.go
@@ -21,9 +21,7 @@ package shim
import (
"fmt"
"testing"
- "time"
- "go.uber.org/zap"
"gotest.tools/v3/assert"
v1 "k8s.io/api/core/v1"
@@ -32,7 +30,6 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/test"
- "github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -58,11 +55,9 @@ partitions:
// init and register scheduler
cluster := MockScheduler{}
cluster.init()
- cluster.start()
+ assert.NilError(t, cluster.start(), "failed to start cluster")
defer cluster.stop()
- // ensure scheduler running
- cluster.waitForSchedulerState(t, SchedulerStates().Running)
err := cluster.updateConfig(configData, nil)
assert.NilError(t, err, "update config failed")
nodeLabels := map[string]string{
@@ -112,11 +107,9 @@ partitions:
// init and register scheduler
cluster := MockScheduler{}
cluster.init()
- cluster.start()
+ assert.NilError(t, cluster.start(), "failed to start cluster")
defer cluster.stop()
- // ensure scheduler state
- cluster.waitForSchedulerState(t, SchedulerStates().Running)
err := cluster.updateConfig(configData, nil)
assert.NilError(t, err, "update config failed")
@@ -173,11 +166,8 @@ func TestSchedulerRegistrationFailed(t *testing.T) {
ctx := cache.NewContext(mockedAPIProvider)
shim := newShimSchedulerInternal(ctx, mockedAPIProvider,
appmgmt.NewAMService(mockedAMProtocol, mockedAPIProvider),
callback)
- shim.Run()
- defer shim.Stop()
-
- err := waitShimSchedulerState(shim, SchedulerStates().Stopped,
5*time.Second)
- assert.NilError(t, err)
+ assert.Error(t, shim.Run(), "some error")
+ shim.Stop()
}
func TestTaskFailures(t *testing.T) {
@@ -203,7 +193,7 @@ partitions:
// init and register scheduler
cluster := MockScheduler{}
cluster.init()
- cluster.start()
+ assert.NilError(t, cluster.start(), "failed to start cluster")
defer cluster.stop()
// mock pod bind failures
@@ -214,8 +204,6 @@ partitions:
return nil
})
- // ensure scheduler state
- cluster.waitForSchedulerState(t, SchedulerStates().Running)
err := cluster.updateConfig(configData, nil)
assert.NilError(t, err, "update config failed")
@@ -248,20 +236,3 @@ partitions:
"[mycluster]default", "app0001", 1)
assert.NilError(t, err, "number of allocations is not expected, error")
}
-
-func waitShimSchedulerState(shim *KubernetesShim, expectedState string,
timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- for {
- if shim.GetSchedulerState() == expectedState {
- log.Log(log.Test).Info("waiting for state",
- zap.String("expect", expectedState),
- zap.String("current", shim.GetSchedulerState()))
- return nil
- }
- time.Sleep(1 * time.Second)
- if time.Now().After(deadline) {
- return fmt.Errorf("scheduler has not reached expected
state %s in %d seconds, current state: %s",
- expectedState, deadline.Second(),
shim.GetSchedulerState())
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]