This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 64b204a2 [YUNIKORN-2539] Shim: Add deadlock tracking feature (#814)
64b204a2 is described below

commit 64b204a2fb3b83fde9d86ea58f5f0d1e42187472
Author: Craig Condit <[email protected]>
AuthorDate: Fri Apr 5 12:06:45 2024 -0500

    [YUNIKORN-2539] Shim: Add deadlock tracking feature (#814)
    
    Replaces sync.{RW}Mutex with internal locking.{RW}Mutex implementations.
    The new implementation wraps the go-deadlock library with logic to
    conditionally enable deadlock detection based on the presence of
    environment variables:
    
    To enable the feature:
    
    - DEADLOCK_DETECTION_ENABLED=true
    
    To customize the timeout before potential deadlocks are logged (default
    is 60 seconds):
    
    - DEADLOCK_TIMEOUT_SECONDS=60
    
    See https://github.com/sasha-s/go-deadlock for more details.
    
    Closes: #814
---
 go.mod                                      |  5 ++-
 go.sum                                      |  8 +++-
 pkg/admission/conf/am_conf.go               |  4 +-
 pkg/admission/namespace_cache.go            |  5 +--
 pkg/admission/priority_class_cache.go       |  5 +--
 pkg/admission/webhook_manager.go            |  4 +-
 pkg/cache/application.go                    |  6 +--
 pkg/cache/application_test.go               |  8 ++--
 pkg/cache/context.go                        |  5 ++-
 pkg/cache/external/scheduler_cache.go       |  4 +-
 pkg/cache/placeholder_manager.go            |  6 +--
 pkg/cache/task.go                           |  6 +--
 pkg/cache/task_test.go                      |  5 ++-
 pkg/client/apifactory.go                    |  6 +--
 pkg/client/apifactory_mock.go               |  4 +-
 pkg/client/kubeclient_mock.go               |  6 +--
 pkg/cmd/admissioncontroller/main.go         |  4 +-
 pkg/common/events/recorder.go               |  3 +-
 pkg/common/test/schedulerapi_mock.go        |  6 +--
 pkg/conf/schedulerconf.go                   |  4 +-
 pkg/dispatcher/dispatch_test.go             |  8 ++--
 pkg/dispatcher/dispatcher.go                |  5 ++-
 pkg/locking/locking.go                      | 44 +++++++++++++++++++
 pkg/locking/locking_test.go                 | 67 +++++++++++++++++++++++++++++
 pkg/plugin/scheduler_plugin.go              |  4 +-
 pkg/shim/scheduler.go                       |  6 +--
 pkg/shim/scheduler_perf_test.go             |  4 +-
 test/e2e/framework/helpers/k8s/k8s_utils.go |  4 +-
 28 files changed, 184 insertions(+), 62 deletions(-)

diff --git a/go.mod b/go.mod
index 9498a248..b5677195 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,7 @@ module github.com/apache/yunikorn-k8shim
 go 1.21
 
 require (
-       github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c
+       github.com/apache/yunikorn-core v0.0.0-20240405153113-5758d7ac3c85
        github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240402211642-e7421a4261fd
        github.com/google/go-cmp v0.6.0
        github.com/google/uuid v1.6.0
@@ -98,10 +98,12 @@ require (
        github.com/opencontainers/go-digest v1.0.0 // indirect
        github.com/opencontainers/selinux v1.11.0 // indirect
        github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
+       github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba // 
indirect
        github.com/pkg/errors v0.9.1 // indirect
        github.com/prometheus/client_model v0.5.0 // indirect
        github.com/prometheus/common v0.45.0 // indirect
        github.com/prometheus/procfs v0.12.0 // indirect
+       github.com/sasha-s/go-deadlock v0.3.1 // indirect
        github.com/spf13/cobra v1.7.0 // indirect
        github.com/spf13/pflag v1.0.5 // indirect
        github.com/stoewer/go-strcase v1.2.0 // indirect
@@ -161,6 +163,7 @@ require (
 
 replace (
        github.com/opencontainers/runc => github.com/opencontainers/runc v1.1.12
+       github.com/petermattis/goid => github.com/petermattis/goid 
v0.0.0-20240327183114-c42a807a84ba
        golang.org/x/crypto => golang.org/x/crypto v0.19.0
        golang.org/x/lint => golang.org/x/lint 
v0.0.0-20210508222113-6edffad5e616
        golang.org/x/net => golang.org/x/net v0.21.0
diff --git a/go.sum b/go.sum
index 6c2622fb..fad0a064 100644
--- a/go.sum
+++ b/go.sum
@@ -9,8 +9,8 @@ github.com/NYTimes/gziphandler v1.1.1 
h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
 github.com/NYTimes/gziphandler v1.1.1/go.mod 
h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
 github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df 
h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
 github.com/antlr/antlr4/runtime/Go/antlr/v4 
v4.0.0-20230305170008-8188dc5388df/go.mod 
h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
-github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c 
h1:WoO71GKblZEKBOuWviJMD5f1W6tdbJp5Pv/utd4zYqw=
-github.com/apache/yunikorn-core v0.0.0-20240402212227-bdf109b5432c/go.mod 
h1:RZCBSMe6UZ04b45ZzwvuhhkY2f7f8ZW7ERvVMUM6dy4=
+github.com/apache/yunikorn-core v0.0.0-20240405153113-5758d7ac3c85 
h1:bPbrFZc+qgsepdJXclpgwGLYEPeKSL6W69i+RUjPc6o=
+github.com/apache/yunikorn-core v0.0.0-20240405153113-5758d7ac3c85/go.mod 
h1:DnScYvh1qQ7v89tebVH43LcuEDoUXLy2wm8aE4Co75Y=
 github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240402211642-e7421a4261fd 
h1:uNOijHkCotZLUZ+A85NSftEJGfP50Opf7ms6Daj6pco=
 github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20240402211642-e7421a4261fd/go.mod 
h1:0f4l3ManMROX60xU7GbhejCEYYyMksH275oY2xIVkbM=
 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 
h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
@@ -175,6 +175,8 @@ github.com/opencontainers/selinux v1.11.0 
h1:+5Zbo97w3Lbmb3PeqQtpmTkMwsW5nRI3YaL
 github.com/opencontainers/selinux v1.11.0/go.mod 
h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec=
 github.com/peterbourgon/diskv v2.0.1+incompatible 
h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod 
h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
+github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba 
h1:3jPgmsFGBID1wFfU2AbYocNcN4wqU68UaHSdMjiw/7U=
+github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba/go.mod 
h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -190,6 +192,8 @@ github.com/prometheus/procfs v0.12.0/go.mod 
h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c
 github.com/rogpeppe/go-internal v1.10.0 
h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
 github.com/rogpeppe/go-internal v1.10.0/go.mod 
h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod 
h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/sasha-s/go-deadlock v0.3.1 
h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
+github.com/sasha-s/go-deadlock v0.3.1/go.mod 
h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
 github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
 github.com/sergi/go-diff v1.1.0/go.mod 
h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
 github.com/sirupsen/logrus v1.9.0 
h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
diff --git a/pkg/admission/conf/am_conf.go b/pkg/admission/conf/am_conf.go
index 529cd2e8..154df9ce 100644
--- a/pkg/admission/conf/am_conf.go
+++ b/pkg/admission/conf/am_conf.go
@@ -23,7 +23,6 @@ import (
        "regexp"
        "strconv"
        "strings"
-       "sync"
 
        "go.uber.org/zap"
        v1 "k8s.io/api/core/v1"
@@ -33,6 +32,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        schedulerconf "github.com/apache/yunikorn-k8shim/pkg/conf"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
@@ -105,7 +105,7 @@ type AdmissionControllerConf struct {
        defaultQueueName        string
        configMaps              []*v1.ConfigMap
 
-       lock sync.RWMutex
+       lock locking.RWMutex
 }
 
 func NewAdmissionControllerConf(configMaps []*v1.ConfigMap) 
*AdmissionControllerConf {
diff --git a/pkg/admission/namespace_cache.go b/pkg/admission/namespace_cache.go
index 4a66f073..c5ff8065 100644
--- a/pkg/admission/namespace_cache.go
+++ b/pkg/admission/namespace_cache.go
@@ -19,20 +19,19 @@
 package admission
 
 import (
-       "sync"
-
        v1 "k8s.io/api/core/v1"
        informersv1 "k8s.io/client-go/informers/core/v1"
        "k8s.io/client-go/tools/cache"
 
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
 type NamespaceCache struct {
        nameSpaces map[string]nsFlags
 
-       sync.RWMutex
+       locking.RWMutex
 }
 
 type triState int
diff --git a/pkg/admission/priority_class_cache.go 
b/pkg/admission/priority_class_cache.go
index 58bec607..b640ce83 100644
--- a/pkg/admission/priority_class_cache.go
+++ b/pkg/admission/priority_class_cache.go
@@ -19,21 +19,20 @@
 package admission
 
 import (
-       "sync"
-
        schedulingv1 "k8s.io/api/scheduling/v1"
        informersv1 "k8s.io/client-go/informers/scheduling/v1"
        "k8s.io/client-go/tools/cache"
 
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
 type PriorityClassCache struct {
        priorityClasses map[string]bool
 
-       sync.RWMutex
+       locking.RWMutex
 }
 
 // NewPriorityClassCache creates a new cache and registers the handler for the 
cache with the Informer.
diff --git a/pkg/admission/webhook_manager.go b/pkg/admission/webhook_manager.go
index 5a419fc3..2d50404a 100644
--- a/pkg/admission/webhook_manager.go
+++ b/pkg/admission/webhook_manager.go
@@ -25,7 +25,6 @@ import (
        "crypto/x509"
        "errors"
        "fmt"
-       "sync"
        "time"
 
        "go.uber.org/zap"
@@ -37,6 +36,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/admission/conf"
        "github.com/apache/yunikorn-k8shim/pkg/admission/pki"
        "github.com/apache/yunikorn-k8shim/pkg/client"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
@@ -81,7 +81,7 @@ type webhookManagerImpl struct {
        caKey2     *rsa.PrivateKey
        expiration time.Time
 
-       sync.RWMutex
+       locking.RWMutex
 }
 
 // NewWebhookManager is used to create a new webhook manager
diff --git a/pkg/cache/application.go b/pkg/cache/application.go
index 06295b52..f43d97f4 100644
--- a/pkg/cache/application.go
+++ b/pkg/cache/application.go
@@ -23,7 +23,6 @@ import (
        "fmt"
        "sort"
        "strings"
-       "sync"
 
        "github.com/looplab/fsm"
        "go.uber.org/zap"
@@ -35,6 +34,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/common/events"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -53,7 +53,7 @@ type Application struct {
        schedulingParamsDefinition string
        placeholderOwnerReferences []metav1.OwnerReference
        sm                         *fsm.FSM
-       lock                       *sync.RWMutex
+       lock                       *locking.RWMutex
        schedulerAPI               api.SchedulerAPI
        placeholderAsk             *si.Resource // total placeholder request 
for the app (all task groups)
        placeholderTimeoutInSec    int64
@@ -81,7 +81,7 @@ func NewApplication(appID, queueName, user string, groups 
[]string, tags map[str
                tags:                    tags,
                sm:                      newAppState(),
                taskGroups:              make([]TaskGroup, 0),
-               lock:                    &sync.RWMutex{},
+               lock:                    &locking.RWMutex{},
                schedulerAPI:            scheduler,
                placeholderTimeoutInSec: 0,
                schedulingStyle:         
constants.SchedulingPolicyStyleParamDefault,
diff --git a/pkg/cache/application_test.go b/pkg/cache/application_test.go
index edd3846f..874a6e14 100644
--- a/pkg/cache/application_test.go
+++ b/pkg/cache/application_test.go
@@ -22,7 +22,6 @@ import (
        "fmt"
        "sort"
        "strings"
-       "sync"
        "testing"
        "time"
 
@@ -41,6 +40,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
        siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -48,7 +48,7 @@ import (
 
 type recorderTime struct {
        time int64
-       lock *sync.RWMutex
+       lock *locking.RWMutex
 }
 
 func TestNewApplication(t *testing.T) {
@@ -129,7 +129,7 @@ func TestFailApplication(t *testing.T) {
 
        rt := &recorderTime{
                time: int64(0),
-               lock: &sync.RWMutex{},
+               lock: &locking.RWMutex{},
        }
        ms := &mockSchedulerAPI{}
        // set test mode
@@ -662,7 +662,7 @@ func TestSetTaskGroupsAndSchedulingPolicy(t *testing.T) {
 
 type threadSafePodsMap struct {
        pods map[string]*v1.Pod
-       sync.RWMutex
+       locking.RWMutex
 }
 
 func newThreadSafePodsMap() *threadSafePodsMap {
diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index 8e003336..c6caea06 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -46,6 +46,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        schedulerconf "github.com/apache/yunikorn-k8shim/pkg/conf"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-k8shim/pkg/plugin/predicates"
        "github.com/apache/yunikorn-k8shim/pkg/plugin/support"
@@ -64,7 +65,7 @@ type Context struct {
        pluginMode     bool                           // true if we are 
configured as a scheduler plugin
        namespace      string                         // yunikorn namespace
        configMaps     []*v1.ConfigMap                // cached yunikorn 
configmaps
-       lock           *sync.RWMutex                  // lock
+       lock           *locking.RWMutex               // lock
        txnID          atomic.Uint64                  // transaction ID counter
        klogger        klog.Logger
 }
@@ -87,7 +88,7 @@ func NewContextWithBootstrapConfigMaps(apis 
client.APIProvider, bootstrapConfigM
                apiProvider:  apis,
                namespace:    apis.GetAPIs().GetConf().Namespace,
                configMaps:   bootstrapConfigMaps,
-               lock:         &sync.RWMutex{},
+               lock:         &locking.RWMutex{},
                klogger:      klog.NewKlogr(),
        }
 
diff --git a/pkg/cache/external/scheduler_cache.go 
b/pkg/cache/external/scheduler_cache.go
index 61d49b94..43958495 100644
--- a/pkg/cache/external/scheduler_cache.go
+++ b/pkg/cache/external/scheduler_cache.go
@@ -20,7 +20,6 @@ package external
 
 import (
        "fmt"
-       "sync"
        "sync/atomic"
 
        "go.uber.org/zap"
@@ -35,6 +34,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/client"
        "github.com/apache/yunikorn-k8shim/pkg/common"
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
@@ -70,7 +70,7 @@ type SchedulerCache struct {
        inProgressAllocations map[string]string      // map of pod to node ID, 
presence indicates an in-process allocation for scheduler
        schedulingTasks       map[string]interface{} // list of task IDs which 
are currently being processed by the scheduler
        pvcRefCounts          map[string]map[string]int
-       lock                  sync.RWMutex
+       lock                  locking.RWMutex
        clients               *client.Clients // client APIs
        klogger               klog.Logger
 
diff --git a/pkg/cache/placeholder_manager.go b/pkg/cache/placeholder_manager.go
index 03113071..230c77c6 100644
--- a/pkg/cache/placeholder_manager.go
+++ b/pkg/cache/placeholder_manager.go
@@ -20,7 +20,6 @@ package cache
 
 import (
        "strings"
-       "sync"
        "sync/atomic"
        "time"
 
@@ -28,6 +27,7 @@ import (
        v1 "k8s.io/api/core/v1"
 
        "github.com/apache/yunikorn-k8shim/pkg/client"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
@@ -43,12 +43,12 @@ type PlaceholderManager struct {
        running     atomic.Value
        cleanupTime time.Duration
        // a simple mutex will do we do not have separate read and write paths
-       sync.RWMutex
+       locking.RWMutex
 }
 
 var (
        placeholderMgr *PlaceholderManager
-       mu             sync.Mutex
+       mu             locking.Mutex
 )
 
 func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
diff --git a/pkg/cache/task.go b/pkg/cache/task.go
index 088c0ebd..bb54f4c0 100644
--- a/pkg/cache/task.go
+++ b/pkg/cache/task.go
@@ -22,7 +22,6 @@ import (
        "context"
        "fmt"
        "strconv"
-       "sync"
        "time"
 
        "github.com/looplab/fsm"
@@ -35,6 +34,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/common/events"
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
@@ -57,7 +57,7 @@ type Task struct {
        originator      bool
        schedulingState TaskSchedulingState
        sm              *fsm.FSM
-       lock            *sync.RWMutex
+       lock            *locking.RWMutex
 }
 
 func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
@@ -101,7 +101,7 @@ func createTaskInternal(tid string, app *Application, 
resource *si.Resource,
                context:         ctx,
                sm:              newTaskState(),
                schedulingState: TaskSchedPending,
-               lock:            &sync.RWMutex{},
+               lock:            &locking.RWMutex{},
        }
        if tgName := utils.GetTaskGroupFromPodSpec(pod); tgName != "" {
                task.taskGroupName = tgName
diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go
index e36bc4da..c08c4f03 100644
--- a/pkg/cache/task_test.go
+++ b/pkg/cache/task_test.go
@@ -19,7 +19,6 @@
 package cache
 
 import (
-       "sync"
        "testing"
        "time"
 
@@ -36,6 +35,8 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
        "github.com/apache/yunikorn-k8shim/pkg/common/events"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
+
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
@@ -523,7 +524,7 @@ func TestHandleSubmitTaskEvent(t *testing.T) {
        mockedContext.addPriorityClass(priorityClass2)
        rt := &recorderTime{
                time: int64(0),
-               lock: &sync.RWMutex{},
+               lock: &locking.RWMutex{},
        }
        conf.GetSchedulerConf().SetTestMode(true)
        mr := events.NewMockedRecorder()
diff --git a/pkg/client/apifactory.go b/pkg/client/apifactory.go
index 9ceeb103..4efdefa3 100644
--- a/pkg/client/apifactory.go
+++ b/pkg/client/apifactory.go
@@ -19,7 +19,6 @@
 package client
 
 import (
-       "sync"
        "time"
 
        "go.uber.org/zap"
@@ -29,6 +28,7 @@ import (
        "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
 
        "github.com/apache/yunikorn-k8shim/pkg/conf"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
 )
@@ -76,7 +76,7 @@ type APIFactory struct {
        clients  *Clients
        testMode bool
        stopChan chan struct{}
-       lock     *sync.RWMutex
+       lock     *locking.RWMutex
 }
 
 func NewAPIFactory(scheduler api.SchedulerAPI, informerFactory 
informers.SharedInformerFactory, configs *conf.SchedulerConf, testMode bool) 
*APIFactory {
@@ -130,7 +130,7 @@ func NewAPIFactory(scheduler api.SchedulerAPI, 
informerFactory informers.SharedI
                },
                testMode: testMode,
                stopChan: make(chan struct{}),
-               lock:     &sync.RWMutex{},
+               lock:     &locking.RWMutex{},
        }
 }
 
diff --git a/pkg/client/apifactory_mock.go b/pkg/client/apifactory_mock.go
index 20c9a814..35decd4c 100644
--- a/pkg/client/apifactory_mock.go
+++ b/pkg/client/apifactory_mock.go
@@ -19,7 +19,6 @@
 package client
 
 import (
-       "sync"
        "time"
 
        "go.uber.org/zap"
@@ -34,12 +33,13 @@ import (
 
        "github.com/apache/yunikorn-k8shim/pkg/common/test"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
 
 type MockedAPIProvider struct {
-       sync.Mutex
+       locking.Mutex
        clients *Clients
 
        stop         chan struct{}
diff --git a/pkg/client/kubeclient_mock.go b/pkg/client/kubeclient_mock.go
index bc890df2..3143f4d3 100644
--- a/pkg/client/kubeclient_mock.go
+++ b/pkg/client/kubeclient_mock.go
@@ -20,7 +20,6 @@ package client
 
 import (
        "fmt"
-       "sync"
        "time"
 
        "go.uber.org/zap"
@@ -29,6 +28,7 @@ import (
        "k8s.io/client-go/kubernetes/fake"
        "k8s.io/client-go/rest"
 
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
@@ -42,7 +42,7 @@ type KubeClientMock struct {
        getFn          func(podName string) (*v1.Pod, error)
        clientSet      kubernetes.Interface
        pods           map[string]*v1.Pod
-       lock           sync.RWMutex
+       lock           locking.RWMutex
        bindStats      BindStats
        boundPods      []BoundPod
 }
@@ -107,7 +107,7 @@ func NewKubeClientMock(err bool) *KubeClientMock {
                },
                clientSet: fake.NewSimpleClientset(),
                pods:      make(map[string]*v1.Pod),
-               lock:      sync.RWMutex{},
+               lock:      locking.RWMutex{},
                boundPods: make([]BoundPod, 0, 1024),
        }
 
diff --git a/pkg/cmd/admissioncontroller/main.go 
b/pkg/cmd/admissioncontroller/main.go
index e96dd1b0..815f9294 100644
--- a/pkg/cmd/admissioncontroller/main.go
+++ b/pkg/cmd/admissioncontroller/main.go
@@ -26,7 +26,6 @@ import (
        "net/http"
        "os"
        "os/signal"
-       "sync"
        "syscall"
 
        "go.uber.org/zap"
@@ -34,6 +33,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/admission"
        "github.com/apache/yunikorn-k8shim/pkg/admission/conf"
        "github.com/apache/yunikorn-k8shim/pkg/client"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
@@ -48,7 +48,7 @@ type WebHook struct {
        ac     *admission.AdmissionController
        port   int
        server *http.Server
-       sync.Mutex
+       locking.Mutex
 }
 
 func main() {
diff --git a/pkg/common/events/recorder.go b/pkg/common/events/recorder.go
index 81d9831c..161c0eef 100644
--- a/pkg/common/events/recorder.go
+++ b/pkg/common/events/recorder.go
@@ -27,11 +27,12 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/client"
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
 )
 
 var eventRecorder events.EventRecorder = events.NewFakeRecorder(1024)
 var once sync.Once
-var lock sync.RWMutex
+var lock locking.RWMutex
 
 func GetRecorder() events.EventRecorder {
        lock.Lock()
diff --git a/pkg/common/test/schedulerapi_mock.go 
b/pkg/common/test/schedulerapi_mock.go
index 0c2ce693..c0b781cb 100644
--- a/pkg/common/test/schedulerapi_mock.go
+++ b/pkg/common/test/schedulerapi_mock.go
@@ -19,9 +19,9 @@
 package test
 
 import (
-       "sync"
        "sync/atomic"
 
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
 )
@@ -36,7 +36,7 @@ type SchedulerAPIMock struct {
        UpdateAllocationFn  func(request *si.AllocationRequest) error
        UpdateApplicationFn func(request *si.ApplicationRequest) error
        UpdateNodeFn        func(request *si.NodeRequest) error
-       lock                sync.Mutex
+       lock                locking.Mutex
 }
 
 func NewSchedulerAPIMock() *SchedulerAPIMock {
@@ -58,7 +58,7 @@ func NewSchedulerAPIMock() *SchedulerAPIMock {
                UpdateNodeFn: func(request *si.NodeRequest) error {
                        return nil
                },
-               lock: sync.Mutex{},
+               lock: locking.Mutex{},
        }
 }
 
diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go
index cd46c27e..da8336a0 100644
--- a/pkg/conf/schedulerconf.go
+++ b/pkg/conf/schedulerconf.go
@@ -39,6 +39,7 @@ import (
        "k8s.io/klog/v2"
 
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
@@ -128,7 +129,8 @@ type SchedulerConf struct {
        InstanceTypeNodeLabelKey string        `json:"instanceTypeNodeLabelKey"`
        Namespace                string        `json:"namespace"`
        GenerateUniqueAppIds     bool          `json:"generateUniqueAppIds"`
-       sync.RWMutex
+
+       locking.RWMutex
 }
 
 func (conf *SchedulerConf) Clone() *SchedulerConf {
diff --git a/pkg/dispatcher/dispatch_test.go b/pkg/dispatcher/dispatch_test.go
index 7060d476..cdf9a94b 100644
--- a/pkg/dispatcher/dispatch_test.go
+++ b/pkg/dispatcher/dispatch_test.go
@@ -22,7 +22,6 @@ import (
        "fmt"
        "runtime"
        "strings"
-       "sync"
        "sync/atomic"
        "testing"
        "time"
@@ -31,6 +30,7 @@ import (
 
        "github.com/apache/yunikorn-k8shim/pkg/common/events"
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
 )
 
 // app event for testing
@@ -77,7 +77,7 @@ func TestRegisterEventHandler(t *testing.T) {
 
 type appEventsRecorder struct {
        apps []string
-       lock *sync.RWMutex
+       lock *locking.RWMutex
 }
 
 func (a *appEventsRecorder) addApp(appID string) {
@@ -109,7 +109,7 @@ func TestDispatcherStartStop(t *testing.T) {
        // thread safe
        recorder := &appEventsRecorder{
                apps: make([]string, 0),
-               lock: &sync.RWMutex{},
+               lock: &locking.RWMutex{},
        }
 
        RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj 
interface{}) {
@@ -165,7 +165,7 @@ func TestEventWillNotBeLostWhenEventChannelIsFull(t 
*testing.T) {
        // thread safe
        recorder := &appEventsRecorder{
                apps: make([]string, 0),
-               lock: &sync.RWMutex{},
+               lock: &locking.RWMutex{},
        }
        // pretend to be an time-consuming event-handler
        RegisterEventHandler("TestAppHandler", EventTypeApp, func(obj 
interface{}) {
diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go
index 51e46ec6..980bba3e 100644
--- a/pkg/dispatcher/dispatcher.go
+++ b/pkg/dispatcher/dispatcher.go
@@ -28,6 +28,7 @@ import (
 
        "github.com/apache/yunikorn-k8shim/pkg/common/events"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
@@ -55,7 +56,7 @@ type Dispatcher struct {
        stopChan  chan struct{}
        handlers  map[EventType]map[string]func(interface{})
        running   atomic.Value
-       lock      sync.RWMutex
+       lock      locking.RWMutex
        stopped   sync.WaitGroup
 }
 
@@ -66,7 +67,7 @@ func initDispatcher() {
                handlers:  make(map[EventType]map[string]func(interface{})),
                stopChan:  make(chan struct{}),
                running:   atomic.Value{},
-               lock:      sync.RWMutex{},
+               lock:      locking.RWMutex{},
        }
        dispatcher.setRunning(false)
        DispatchTimeout = conf.GetSchedulerConf().DispatchTimeout
diff --git a/pkg/locking/locking.go b/pkg/locking/locking.go
new file mode 100644
index 00000000..b837e903
--- /dev/null
+++ b/pkg/locking/locking.go
@@ -0,0 +1,44 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package locking
+
+import (
+       "sync"
+
+       godeadlock "github.com/sasha-s/go-deadlock"
+
+       corelocking "github.com/apache/yunikorn-core/pkg/locking"
+)
+
+var once sync.Once
+
+func init() {
+       once.Do(func() {
+               // call into core locking package to ensure that all locks are 
globally configured
+               corelocking.IsTrackingEnabled()
+       })
+}
+
+type Mutex struct {
+       godeadlock.Mutex
+}
+
+type RWMutex struct {
+       godeadlock.RWMutex
+}
diff --git a/pkg/locking/locking_test.go b/pkg/locking/locking_test.go
new file mode 100644
index 00000000..fd4f4c5a
--- /dev/null
+++ b/pkg/locking/locking_test.go
@@ -0,0 +1,67 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+//nolint:staticcheck
+package locking
+
+import (
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "gotest.tools/v3/assert"
+)
+
+func TestMutex(t *testing.T) {
+       var mutex Mutex
+       var result atomic.Int32
+       mutex.Lock()
+       go func() {
+               mutex.Lock()
+               result.Store(2)
+               mutex.Unlock()
+       }()
+       time.Sleep(100 * time.Millisecond)
+       result.Store(1)
+       mutex.Unlock()
+       time.Sleep(100 * time.Millisecond)
+       assert.Equal(t, int32(2), result.Load())
+}
+
+func TestRWMutex(t *testing.T) {
+       var mutex RWMutex
+       var count atomic.Int32
+       mutex.RLock()
+       go func() {
+               mutex.Lock()
+               count.Add(1)
+               mutex.Unlock()
+       }()
+       go func() {
+               mutex.Lock()
+               count.Add(1)
+               mutex.Unlock()
+       }()
+       time.Sleep(100 * time.Millisecond)
+       before := count.Load()
+       mutex.RUnlock()
+       time.Sleep(500 * time.Millisecond)
+       after := count.Load()
+       assert.Equal(t, before, int32(0))
+       assert.Equal(t, after, int32(2))
+}
diff --git a/pkg/plugin/scheduler_plugin.go b/pkg/plugin/scheduler_plugin.go
index 2c10f9e3..6d0351ca 100644
--- a/pkg/plugin/scheduler_plugin.go
+++ b/pkg/plugin/scheduler_plugin.go
@@ -21,7 +21,6 @@ package plugin
 import (
        "context"
        "fmt"
-       "sync"
 
        "go.uber.org/zap"
        v1 "k8s.io/api/core/v1"
@@ -38,6 +37,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-k8shim/pkg/shim"
 )
@@ -70,7 +70,7 @@ const (
 // pod to be rescheduled, as this means the prior allocation could not be 
completed successfully by the default
 // scheduler for some reason.
 type YuniKornSchedulerPlugin struct {
-       sync.RWMutex
+       locking.RWMutex
        context *cache.Context
 }
 
diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go
index cb53bb3f..3e78d7ff 100644
--- a/pkg/shim/scheduler.go
+++ b/pkg/shim/scheduler.go
@@ -19,7 +19,6 @@
 package shim
 
 import (
-       "sync"
        "time"
 
        "go.uber.org/zap"
@@ -32,6 +31,7 @@ import (
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
        "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -44,7 +44,7 @@ type KubernetesShim struct {
        phManager            *cache.PlaceholderManager
        callback             api.ResourceManagerCallback
        stopChan             chan struct{}
-       lock                 *sync.RWMutex
+       lock                 *locking.RWMutex
        outstandingAppsFound bool
 }
 
@@ -86,7 +86,7 @@ func newShimSchedulerInternal(ctx *cache.Context, apiFactory 
client.APIProvider,
                phManager:            
cache.NewPlaceholderManager(apiFactory.GetAPIs()),
                callback:             cb,
                stopChan:             make(chan struct{}),
-               lock:                 &sync.RWMutex{},
+               lock:                 &locking.RWMutex{},
                outstandingAppsFound: false,
        }
        // init dispatcher
diff --git a/pkg/shim/scheduler_perf_test.go b/pkg/shim/scheduler_perf_test.go
index 940a1b2c..d3858399 100644
--- a/pkg/shim/scheduler_perf_test.go
+++ b/pkg/shim/scheduler_perf_test.go
@@ -26,7 +26,6 @@ import (
        "runtime/pprof"
        "strconv"
        "strings"
-       "sync"
        "sync/atomic"
        "testing"
        "time"
@@ -40,6 +39,7 @@ import (
        "k8s.io/apimachinery/pkg/util/wait"
 
        "github.com/apache/yunikorn-k8shim/pkg/common/constants"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/pkg/log"
 )
 
@@ -170,7 +170,7 @@ func addPodsToCluster(cluster *MockScheduler) 
map[string]*v1.Pod {
 
 type metricsCollector struct {
        podsPerSec []float64
-       sync.Mutex
+       locking.Mutex
 }
 
 func (m *metricsCollector) collectData() {
diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go 
b/test/e2e/framework/helpers/k8s/k8s_utils.go
index 749831d1..b97b35bc 100644
--- a/test/e2e/framework/helpers/k8s/k8s_utils.go
+++ b/test/e2e/framework/helpers/k8s/k8s_utils.go
@@ -27,7 +27,6 @@ import (
        "os/exec"
        "path/filepath"
        "strings"
-       "sync"
        "time"
 
        "github.com/onsi/ginkgo/v2"
@@ -57,6 +56,7 @@ import (
        podutil "k8s.io/kubernetes/pkg/api/v1/pod"
 
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
+       "github.com/apache/yunikorn-k8shim/pkg/locking"
        "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
        "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
 )
@@ -75,7 +75,7 @@ const (
 )
 
 var fw *portforward.PortForwarder
-var lock = &sync.Mutex{}
+var lock = &locking.Mutex{}
 
 type KubeCtl struct {
        clientSet      *kubernetes.Clientset


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to