This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 64b204a2 [YUNIKORN-2539] Shim: Add deadlock tracking feature (#814)
64b204a2 is described below
commit 64b204a2fb3b83fde9d86ea58f5f0d1e42187472
Author: Craig Condit <[email protected]>
AuthorDate: Fri Apr 5 12:06:45 2024 -0500
[YUNIKORN-2539] Shim: Add deadlock tracking feature (#814)
Replaces sync.{RW}Mutex with internal locking.{RW}Mutex implementations.
The new implementation wraps the go-deadlock library with logic to
conditionally enable deadlock detection based on the presence of
environment variables:
To enable the feature:
- DEADLOCK_DETECTION_ENABLED=true
To customize the timeout before potential deadlocks are logged (default
is 60 seconds):
- DEADLOCK_TIMEOUT_SECONDS=60
See https://github.com/sasha-s/go-deadlock for more details.
Closes: #814
---
go.mod | 5 ++-
go.sum | 8 +++-
pkg/admission/conf/am_conf.go | 4 +-
pkg/admission/namespace_cache.go | 5 +--
pkg/admission/priority_class_cache.go | 5 +--
pkg/admission/webhook_manager.go | 4 +-
pkg/cache/application.go | 6 +--
pkg/cache/application_test.go | 8 ++--
pkg/cache/context.go | 5 ++-
pkg/cache/external/scheduler_cache.go | 4 +-
pkg/cache/placeholder_manager.go | 6 +--
pkg/cache/task.go | 6 +--
pkg/cache/task_test.go | 5 ++-
pkg/client/apifactory.go | 6 +--
pkg/client/apifactory_mock.go | 4 +-
pkg/client/kubeclient_mock.go | 6 +--
pkg/cmd/admissioncontroller/main.go | 4 +-
pkg/common/events/recorder.go | 3 +-
pkg/common/test/schedulerapi_mock.go | 6 +--
pkg/conf/schedulerconf.go | 4 +-
pkg/dispatcher/dispatch_test.go | 8 ++--
pkg/dispatcher/dispatcher.go | 5 ++-
pkg/locking/locking.go | 44 +++++++++++++++++++
pkg/locking/locking_test.go | 67 +++++++++++++++++++++++++++++
pkg/plugin/scheduler_plugin.go | 4 +-
pkg/shim/scheduler.go | 6 +--
pkg/shim/scheduler_perf_test.go | 4 +-
test/e2e/framework/helpers/k8s/k8s_utils.go | 4 +-
28 files changed, 184 insertions(+), 62 deletions(-)
diff --git a/go.mod b/go.mod
index 9498a248..b5677195 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,7 @@ module github.com/apache/yunikorn-k8shim
go 1.21
require (
- github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c
+ github.com/apache/yunikorn-core v0.0.0-20240405153113-5758d7ac3c85
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
@@ -98,10 +98,12 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/selinux v1.11.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
+ github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba //
indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
+ github.com/sasha-s/go-deadlock v0.3.1 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
@@ -161,6 +163,7 @@ require (
replace (
github.com/opencontainers/runc => github.com/opencontainers/runc v1.1.12
+ github.com/petermattis/goid => github.com/petermattis/goid
v0.0.0-20240327183114-c42a807a84ba
golang.org/x/crypto => golang.org/x/crypto v0.19.0
golang.org/x/lint => golang.org/x/lint
v0.0.0-20210508222113-6edffad5e616
golang.org/x/net => golang.org/x/net v0.21.0
diff --git a/go.sum b/go.sum
index 6c2622fb..fad0a064 100644
--- a/go.sum
+++ b/go.sum
@@ -9,8 +9,8 @@ github.com/NYTimes/gziphandler v1.1.1
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df
h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4
v4.0.0-20230305170008-8188dc5388df/go.mod
h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
-github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c
h1:WoO71GKblZEKBOuWviJMD5f1W6tdbJp5Pv/utd4zYqw=
-github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c/go.mod
h1:RZCBSMe6UZ04b45ZzwvuhhkY2f7f8ZW7ERvVMUM6dy4=
+github.com/apache/yunikorn-core v0.0.0-20240405153113-5758d7ac3c85
h1:bPbrFZc+qgsepdJXclpgwGLYEPeKSL6W69i+RUjPc6o=
+github.com/apache/yunikorn-core v0.0.0-20240405153113-5758d7ac3c85/go.mod
h1:DnScYvh1qQ7v89tebVH43LcuEDoUXLy2wm8aE4Co75Y=
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd
h1:uNOijHkCotZLUZ+A85NSftEJGfP50Opf7ms6Daj6pco=
github.com/apache/yunikorn-scheduler-interface
v0.0.0-20240402211642-e7421a4261fd/go.mod
h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
@@ -175,6 +175,8 @@ github.com/opencontainers/selinux v1.11.0
h1:+5Zbo97w3Lbmb3PeqQtpmTkMwsW5nRI3YaL
github.com/opencontainers/selinux v1.11.0/go.mod
h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec=
github.com/peterbourgon/diskv v2.0.1+incompatible
h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod
h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
+github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba
h1:3jPgmsFGBID1wFfU2AbYocNcN4wqU68UaHSdMjiw/7U=
+github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba/go.mod
h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -190,6 +192,8 @@ github.com/prometheus/procfs v0.12.0/go.mod
h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c
github.com/rogpeppe/go-internal v1.10.0
h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod
h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/russross/blackfriday/v2 v2.1.0/go.mod
h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/sasha-s/go-deadlock v0.3.1
h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
+github.com/sasha-s/go-deadlock v0.3.1/go.mod
h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod
h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sirupsen/logrus v1.9.0
h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
diff --git a/pkg/admission/conf/am_conf.go b/pkg/admission/conf/am_conf.go
index 529cd2e8..154df9ce 100644
--- a/pkg/admission/conf/am_conf.go
+++ b/pkg/admission/conf/am_conf.go
@@ -23,7 +23,6 @@ import (
"regexp"
"strconv"
"strings"
- "sync"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
@@ -33,6 +32,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
schedulerconf "github.com/apache/yunikorn-k8shim/pkg/conf"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
@@ -105,7 +105,7 @@ type AdmissionControllerConf struct {
defaultQueueName string
configMaps []*v1.ConfigMap
- lock sync.RWMutex
+ lock locking.RWMutex
}
func NewAdmissionControllerConf(configMaps []*v1.ConfigMap)
*AdmissionControllerConf {
diff --git a/pkg/admission/namespace_cache.go b/pkg/admission/namespace_cache.go
index 4a66f073..c5ff8065 100644
--- a/pkg/admission/namespace_cache.go
+++ b/pkg/admission/namespace_cache.go
@@ -19,20 +19,19 @@
package admission
import (
- "sync"
-
v1 "k8s.io/api/core/v1"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
type NamespaceCache struct {
nameSpaces map[string]nsFlags
- sync.RWMutex
+ locking.RWMutex
}
type triState int
diff --git a/pkg/admission/priority_class_cache.go
b/pkg/admission/priority_class_cache.go
index 58bec607..b640ce83 100644
--- a/pkg/admission/priority_class_cache.go
+++ b/pkg/admission/priority_class_cache.go
@@ -19,21 +19,20 @@
package admission
import (
- "sync"
-
schedulingv1 "k8s.io/api/scheduling/v1"
informersv1 "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/tools/cache"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
type PriorityClassCache struct {
priorityClasses map[string]bool
- sync.RWMutex
+ locking.RWMutex
}
// NewPriorityClassCache creates a new cache and registers the handler for the
cache with the Informer.
diff --git a/pkg/admission/webhook_manager.go b/pkg/admission/webhook_manager.go
index 5a419fc3..2d50404a 100644
--- a/pkg/admission/webhook_manager.go
+++ b/pkg/admission/webhook_manager.go
@@ -25,7 +25,6 @@ import (
"crypto/x509"
"errors"
"fmt"
- "sync"
"time"
"go.uber.org/zap"
@@ -37,6 +36,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/admission/conf"
"github.com/apache/yunikorn-k8shim/pkg/admission/pki"
"github.com/apache/yunikorn-k8shim/pkg/client"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
@@ -81,7 +81,7 @@ type webhookManagerImpl struct {
caKey2 *rsa.PrivateKey
expiration time.Time
- sync.RWMutex
+ locking.RWMutex
}
// NewWebhookManager is used to create a new webhook manager
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index 06295b52..f43d97f4 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -23,7 +23,6 @@ import (
"fmt"
"sort"
"strings"
- "sync"
"github.com/looplab/fsm"
"go.uber.org/zap"
@@ -35,6 +34,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -53,7 +53,7 @@ type Application struct {
schedulingParamsDefinition string
placeholderOwnerReferences []metav1.OwnerReference
sm *fsm.FSM
- lock *sync.RWMutex
+ lock *locking.RWMutex
schedulerAPI api.SchedulerAPI
placeholderAsk *si.Resource // total placeholder request
for the app (all task groups)
placeholderTimeoutInSec int64
@@ -81,7 +81,7 @@ func NewApplication(appID, queueName, user string, groups
[]string, tags map[str
tags: tags,
sm: newAppState(),
taskGroups: make([]TaskGroup, 0),
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
schedulerAPI: scheduler,
placeholderTimeoutInSec: 0,
schedulingStyle:
constants.SchedulingPolicyStyleParamDefault,
diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go
index edd3846f..874a6e14 100644
--- a/pkg/cache/application_test.go
+++ b/pkg/cache/application_test.go
@@ -22,7 +22,6 @@ import (
"fmt"
"sort"
"strings"
- "sync"
"testing"
"time"
@@ -41,6 +40,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -48,7 +48,7 @@ import (
type recorderTime struct {
time int64
- lock *sync.RWMutex
+ lock *locking.RWMutex
}
func TestNewApplication(t *testing.T) {
@@ -129,7 +129,7 @@ func TestFailApplication(t *testing.T) {
rt := &recorderTime{
time: int64(0),
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
}
ms := &mockSchedulerAPI{}
// set test mode
@@ -662,7 +662,7 @@ func TestSetTaskGroupsAndSchedulingPolicy(t *testing.T) {
type threadSafePodsMap struct {
pods map[string]*v1.Pod
- sync.RWMutex
+ locking.RWMutex
}
func newThreadSafePodsMap() *threadSafePodsMap {
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 8e003336..c6caea06 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -46,6 +46,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
schedulerconf "github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-k8shim/pkg/plugin/predicates"
"github.com/apache/yunikorn-k8shim/pkg/plugin/support"
@@ -64,7 +65,7 @@ type Context struct {
pluginMode bool // true if we are
configured as a scheduler plugin
namespace string // yunikorn namespace
configMaps []*v1.ConfigMap // cached yunikorn
configmaps
- lock *sync.RWMutex // lock
+ lock *locking.RWMutex // lock
txnID atomic.Uint64 // transaction ID counter
klogger klog.Logger
}
@@ -87,7 +88,7 @@ func NewContextWithBootstrapConfigMaps(apis
client.APIProvider, bootstrapConfigM
apiProvider: apis,
namespace: apis.GetAPIs().GetConf().Namespace,
configMaps: bootstrapConfigMaps,
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
klogger: klog.NewKlogr(),
}
diff --git a/pkg/cache/external/scheduler_cache.go
b/pkg/cache/external/scheduler_cache.go
index 61d49b94..43958495 100644
--- a/pkg/cache/external/scheduler_cache.go
+++ b/pkg/cache/external/scheduler_cache.go
@@ -20,7 +20,6 @@ package external
import (
"fmt"
- "sync"
"sync/atomic"
"go.uber.org/zap"
@@ -35,6 +34,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -70,7 +70,7 @@ type SchedulerCache struct {
inProgressAllocations map[string]string // map of pod to node ID,
presence indicates an in-process allocation for scheduler
schedulingTasks map[string]interface{} // list of task IDs which
are currently being processed by the scheduler
pvcRefCounts map[string]map[string]int
- lock sync.RWMutex
+ lock locking.RWMutex
clients *client.Clients // client APIs
klogger klog.Logger
diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go
index 03113071..230c77c6 100644
--- a/pkg/cache/placeholder_manager.go
+++ b/pkg/cache/placeholder_manager.go
@@ -20,7 +20,6 @@ package cache
import (
"strings"
- "sync"
"sync/atomic"
"time"
@@ -28,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
"github.com/apache/yunikorn-k8shim/pkg/client"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
@@ -43,12 +43,12 @@ type PlaceholderManager struct {
running atomic.Value
cleanupTime time.Duration
// a simple mutex will do we do not have separate read and write paths
- sync.RWMutex
+ locking.RWMutex
}
var (
placeholderMgr *PlaceholderManager
- mu sync.Mutex
+ mu locking.Mutex
)
func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 088c0ebd..bb54f4c0 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -22,7 +22,6 @@ import (
"context"
"fmt"
"strconv"
- "sync"
"time"
"github.com/looplab/fsm"
@@ -35,6 +34,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -57,7 +57,7 @@ type Task struct {
originator bool
schedulingState TaskSchedulingState
sm *fsm.FSM
- lock *sync.RWMutex
+ lock *locking.RWMutex
}
func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
@@ -101,7 +101,7 @@ func createTaskInternal(tid string, app *Application,
resource *si.Resource,
context: ctx,
sm: newTaskState(),
schedulingState: TaskSchedPending,
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
}
if tgName := utils.GetTaskGroupFromPodSpec(pod); tgName != "" {
task.taskGroupName = tgName
diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go
index e36bc4da..c08c4f03 100644
--- a/pkg/cache/task_test.go
+++ b/pkg/cache/task_test.go
@@ -19,7 +19,6 @@
package cache
import (
- "sync"
"testing"
"time"
@@ -36,6 +35,8 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/conf"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
+
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -523,7 +524,7 @@ func TestHandleSubmitTaskEvent(t *testing.T) {
mockedContext.addPriorityClass(priorityClass2)
rt := &recorderTime{
time: int64(0),
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
}
conf.GetSchedulerConf().SetTestMode(true)
mr := events.NewMockedRecorder()
diff --git a/pkg/client/apifactory.go b/pkg/client/apifactory.go
index 9ceeb103..4efdefa3 100644
--- a/pkg/client/apifactory.go
+++ b/pkg/client/apifactory.go
@@ -19,7 +19,6 @@
package client
import (
- "sync"
"time"
"go.uber.org/zap"
@@ -29,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"github.com/apache/yunikorn-k8shim/pkg/conf"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
)
@@ -76,7 +76,7 @@ type APIFactory struct {
clients *Clients
testMode bool
stopChan chan struct{}
- lock *sync.RWMutex
+ lock *locking.RWMutex
}
func NewAPIFactory(scheduler api.SchedulerAPI, informerFactory
informers.SharedInformerFactory, configs *conf.SchedulerConf, testMode bool)
*APIFactory {
@@ -130,7 +130,7 @@ func NewAPIFactory(scheduler api.SchedulerAPI,
informerFactory informers.SharedI
},
testMode: testMode,
stopChan: make(chan struct{}),
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
}
}
diff --git a/pkg/client/apifactory_mock.go b/pkg/client/apifactory_mock.go
index 20c9a814..35decd4c 100644
--- a/pkg/client/apifactory_mock.go
+++ b/pkg/client/apifactory_mock.go
@@ -19,7 +19,6 @@
package client
import (
- "sync"
"time"
"go.uber.org/zap"
@@ -34,12 +33,13 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/test"
"github.com/apache/yunikorn-k8shim/pkg/conf"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type MockedAPIProvider struct {
- sync.Mutex
+ locking.Mutex
clients *Clients
stop chan struct{}
diff --git a/pkg/client/kubeclient_mock.go b/pkg/client/kubeclient_mock.go
index bc890df2..3143f4d3 100644
--- a/pkg/client/kubeclient_mock.go
+++ b/pkg/client/kubeclient_mock.go
@@ -20,7 +20,6 @@ package client
import (
"fmt"
- "sync"
"time"
"go.uber.org/zap"
@@ -29,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
@@ -42,7 +42,7 @@ type KubeClientMock struct {
getFn func(podName string) (*v1.Pod, error)
clientSet kubernetes.Interface
pods map[string]*v1.Pod
- lock sync.RWMutex
+ lock locking.RWMutex
bindStats BindStats
boundPods []BoundPod
}
@@ -107,7 +107,7 @@ func NewKubeClientMock(err bool) *KubeClientMock {
},
clientSet: fake.NewSimpleClientset(),
pods: make(map[string]*v1.Pod),
- lock: sync.RWMutex{},
+ lock: locking.RWMutex{},
boundPods: make([]BoundPod, 0, 1024),
}
diff --git a/pkg/cmd/admissioncontroller/main.go
b/pkg/cmd/admissioncontroller/main.go
index e96dd1b0..815f9294 100644
--- a/pkg/cmd/admissioncontroller/main.go
+++ b/pkg/cmd/admissioncontroller/main.go
@@ -26,7 +26,6 @@ import (
"net/http"
"os"
"os/signal"
- "sync"
"syscall"
"go.uber.org/zap"
@@ -34,6 +33,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/admission"
"github.com/apache/yunikorn-k8shim/pkg/admission/conf"
"github.com/apache/yunikorn-k8shim/pkg/client"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
@@ -48,7 +48,7 @@ type WebHook struct {
ac *admission.AdmissionController
port int
server *http.Server
- sync.Mutex
+ locking.Mutex
}
func main() {
diff --git a/pkg/common/events/recorder.go b/pkg/common/events/recorder.go
index 81d9831c..161c0eef 100644
--- a/pkg/common/events/recorder.go
+++ b/pkg/common/events/recorder.go
@@ -27,11 +27,12 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/conf"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
)
var eventRecorder events.EventRecorder = events.NewFakeRecorder(1024)
var once sync.Once
-var lock sync.RWMutex
+var lock locking.RWMutex
func GetRecorder() events.EventRecorder {
lock.Lock()
diff --git a/pkg/common/test/schedulerapi_mock.go
b/pkg/common/test/schedulerapi_mock.go
index 0c2ce693..c0b781cb 100644
--- a/pkg/common/test/schedulerapi_mock.go
+++ b/pkg/common/test/schedulerapi_mock.go
@@ -19,9 +19,9 @@
package test
import (
- "sync"
"sync/atomic"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -36,7 +36,7 @@ type SchedulerAPIMock struct {
UpdateAllocationFn func(request *si.AllocationRequest) error
UpdateApplicationFn func(request *si.ApplicationRequest) error
UpdateNodeFn func(request *si.NodeRequest) error
- lock sync.Mutex
+ lock locking.Mutex
}
func NewSchedulerAPIMock() *SchedulerAPIMock {
@@ -58,7 +58,7 @@ func NewSchedulerAPIMock() *SchedulerAPIMock {
UpdateNodeFn: func(request *si.NodeRequest) error {
return nil
},
- lock: sync.Mutex{},
+ lock: locking.Mutex{},
}
}
diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go
index cd46c27e..da8336a0 100644
--- a/pkg/conf/schedulerconf.go
+++ b/pkg/conf/schedulerconf.go
@@ -39,6 +39,7 @@ import (
"k8s.io/klog/v2"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
@@ -128,7 +129,8 @@ type SchedulerConf struct {
InstanceTypeNodeLabelKey string `json:"instanceTypeNodeLabelKey"`
Namespace string `json:"namespace"`
GenerateUniqueAppIds bool `json:"generateUniqueAppIds"`
- sync.RWMutex
+
+ locking.RWMutex
}
func (conf *SchedulerConf) Clone() *SchedulerConf {
diff --git a/pkg/dispatcher/dispatch_test.go b/pkg/dispatcher/dispatch_test.go
index 7060d476..cdf9a94b 100644
--- a/pkg/dispatcher/dispatch_test.go
+++ b/pkg/dispatcher/dispatch_test.go
@@ -22,7 +22,6 @@ import (
"fmt"
"runtime"
"strings"
- "sync"
"sync/atomic"
"testing"
"time"
@@ -31,6 +30,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
)
// app event for testing
@@ -77,7 +77,7 @@ func TestRegisterEventHandler(t *testing.T) {
type appEventsRecorder struct {
apps []string
- lock *sync.RWMutex
+ lock *locking.RWMutex
}
func (a *appEventsRecorder) addApp(appID string) {
@@ -109,7 +109,7 @@ func TestDispatcherStartStop(t *testing.T) {
// thread safe
recorder := &appEventsRecorder{
apps: make([]string, 0),
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
}
RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj
interface{}) {
@@ -165,7 +165,7 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t
*testing.T) {
// thread safe
recorder := &appEventsRecorder{
apps: make([]string, 0),
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
}
// pretend to be an time-consuming event-handler
RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj
interface{}) {
diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go
index 51e46ec6..980bba3e 100644
--- a/pkg/dispatcher/dispatcher.go
+++ b/pkg/dispatcher/dispatcher.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/conf"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
@@ -55,7 +56,7 @@ type Dispatcher struct {
stopChan chan struct{}
handlers map[EventType]map[string]func(interface{})
running atomic.Value
- lock sync.RWMutex
+ lock locking.RWMutex
stopped sync.WaitGroup
}
@@ -66,7 +67,7 @@ func initDispatcher() {
handlers: make(map[EventType]map[string]func(interface{})),
stopChan: make(chan struct{}),
running: atomic.Value{},
- lock: sync.RWMutex{},
+ lock: locking.RWMutex{},
}
dispatcher.setRunning(false)
DispatchTimeout = conf.GetSchedulerConf().DispatchTimeout
diff --git a/pkg/locking/locking.go b/pkg/locking/locking.go
new file mode 100644
index 00000000..b837e903
--- /dev/null
+++ b/pkg/locking/locking.go
@@ -0,0 +1,44 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package locking
+
+import (
+ "sync"
+
+ godeadlock "github.com/sasha-s/go-deadlock"
+
+ corelocking "github.com/apache/yunikorn-core/pkg/locking"
+)
+
+var once sync.Once
+
+func init() {
+ once.Do(func() {
+ // call into core locking package to ensure that all locks are
globally configured
+ corelocking.IsTrackingEnabled()
+ })
+}
+
+type Mutex struct {
+ godeadlock.Mutex
+}
+
+type RWMutex struct {
+ godeadlock.RWMutex
+}
diff --git a/pkg/locking/locking_test.go b/pkg/locking/locking_test.go
new file mode 100644
index 00000000..fd4f4c5a
--- /dev/null
+++ b/pkg/locking/locking_test.go
@@ -0,0 +1,67 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+//nolint:staticcheck
+package locking
+
+import (
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "gotest.tools/v3/assert"
+)
+
+func TestMutex(t *testing.T) {
+ var mutex Mutex
+ var result atomic.Int32
+ mutex.Lock()
+ go func() {
+ mutex.Lock()
+ result.Store(2)
+ mutex.Unlock()
+ }()
+ time.Sleep(100 * time.Millisecond)
+ result.Store(1)
+ mutex.Unlock()
+ time.Sleep(100 * time.Millisecond)
+ assert.Equal(t, int32(2), result.Load())
+}
+
+func TestRWMutex(t *testing.T) {
+ var mutex RWMutex
+ var count atomic.Int32
+ mutex.RLock()
+ go func() {
+ mutex.Lock()
+ count.Add(1)
+ mutex.Unlock()
+ }()
+ go func() {
+ mutex.Lock()
+ count.Add(1)
+ mutex.Unlock()
+ }()
+ time.Sleep(100 * time.Millisecond)
+ before := count.Load()
+ mutex.RUnlock()
+ time.Sleep(500 * time.Millisecond)
+ after := count.Load()
+ assert.Equal(t, before, int32(0))
+ assert.Equal(t, after, int32(2))
+}
diff --git a/pkg/plugin/scheduler_plugin.go b/pkg/plugin/scheduler_plugin.go
index 2c10f9e3..6d0351ca 100644
--- a/pkg/plugin/scheduler_plugin.go
+++ b/pkg/plugin/scheduler_plugin.go
@@ -21,7 +21,6 @@ package plugin
import (
"context"
"fmt"
- "sync"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
@@ -38,6 +37,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-k8shim/pkg/shim"
)
@@ -70,7 +70,7 @@ const (
// pod to be rescheduled, as this means the prior allocation could not be
completed successfully by the default
// scheduler for some reason.
type YuniKornSchedulerPlugin struct {
- sync.RWMutex
+ locking.RWMutex
context *cache.Context
}
diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go
index cb53bb3f..3e78d7ff 100644
--- a/pkg/shim/scheduler.go
+++ b/pkg/shim/scheduler.go
@@ -19,7 +19,6 @@
package shim
import (
- "sync"
"time"
"go.uber.org/zap"
@@ -32,6 +31,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -44,7 +44,7 @@ type KubernetesShim struct {
phManager *cache.PlaceholderManager
callback api.ResourceManagerCallback
stopChan chan struct{}
- lock *sync.RWMutex
+ lock *locking.RWMutex
outstandingAppsFound bool
}
@@ -86,7 +86,7 @@ func newShimSchedulerInternal(ctx *cache.Context, apiFactory
client.APIProvider,
phManager:
cache.NewPlaceholderManager(apiFactory.GetAPIs()),
callback: cb,
stopChan: make(chan struct{}),
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
outstandingAppsFound: false,
}
// init dispatcher
diff --git a/pkg/shim/scheduler_perf_test.go b/pkg/shim/scheduler_perf_test.go
index 940a1b2c..d3858399 100644
--- a/pkg/shim/scheduler_perf_test.go
+++ b/pkg/shim/scheduler_perf_test.go
@@ -26,7 +26,6 @@ import (
"runtime/pprof"
"strconv"
"strings"
- "sync"
"sync/atomic"
"testing"
"time"
@@ -40,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)
@@ -170,7 +170,7 @@ func addPodsToCluster(cluster *MockScheduler)
map[string]*v1.Pod {
type metricsCollector struct {
podsPerSec []float64
- sync.Mutex
+ locking.Mutex
}
func (m *metricsCollector) collectData() {
diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 749831d1..b97b35bc 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -27,7 +27,6 @@ import (
"os/exec"
"path/filepath"
"strings"
- "sync"
"time"
"github.com/onsi/ginkgo/v2"
@@ -57,6 +56,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
+ "github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
)
@@ -75,7 +75,7 @@ const (
)
var fw *portforward.PortForwarder
-var lock = &sync.Mutex{}
+var lock = &locking.Mutex{}
type KubeCtl struct {
clientSet *kubernetes.Clientset
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]