This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.5 by this push:
new c94a7d93 [YUNIKORN-2539] Core: Add deadlock tracking feature (#835)
c94a7d93 is described below
commit c94a7d938c418b6a44ba46e0934782a568e3d69d
Author: Craig Condit <[email protected]>
AuthorDate: Fri Apr 5 10:31:13 2024 -0500
[YUNIKORN-2539] Core: Add deadlock tracking feature (#835)
Replaces sync.{RW}Mutex with internal locking.{RW}Mutex implementations.
The new implementation wraps the go-deadlock library with logic to
conditionally enable deadlock detection based on the presence of
environment variables:
To enable the feature:
- DEADLOCK_DETECTION_ENABLED=true
To customize the timeout before potential deadlocks are logged (default
is 60 seconds):
- DEADLOCK_TIMEOUT_SECONDS=60
See https://github.com/sasha-s/go-deadlock for more details.
Closes: #835
---
go.mod | 3 +
go.sum | 4 +
pkg/common/configs/configs.go | 8 +-
pkg/common/resources/tracked_resources.go | 5 +-
pkg/common/security/usergroup.go | 3 +-
pkg/events/event_ringbuffer.go | 4 +-
pkg/events/event_store.go | 5 +-
pkg/events/event_streaming.go | 4 +-
pkg/events/event_system.go | 3 +-
pkg/examples/simple_example.go | 5 +-
pkg/locking/locking.go | 117 +++++++++++++
.../types.go => locking/locking_race_test.go} | 27 ++-
pkg/locking/locking_test.go | 193 +++++++++++++++++++++
pkg/log/logger.go | 3 +-
pkg/log/logger_test.go | 2 +-
pkg/metrics/history/internal_metrics.go | 5 +-
pkg/metrics/init.go | 6 +-
pkg/metrics/scheduler.go | 6 +-
pkg/mock/container_state_updater.go | 5 +-
pkg/mock/event_plugin.go | 5 +-
pkg/plugins/types.go | 5 +-
pkg/rmproxy/rmproxy.go | 4 +-
pkg/rmproxy/rmproxy_mock.go | 5 +-
pkg/scheduler/context.go | 4 +-
pkg/scheduler/health_checker.go | 4 +-
pkg/scheduler/objects/allocation.go | 4 +-
pkg/scheduler/objects/allocation_ask.go | 4 +-
pkg/scheduler/objects/application.go | 3 +-
pkg/scheduler/objects/node.go | 4 +-
pkg/scheduler/objects/node_collection.go | 4 +-
pkg/scheduler/objects/queue.go | 4 +-
pkg/scheduler/objects/required_node_preemptor.go | 4 +-
pkg/scheduler/partition.go | 4 +-
pkg/scheduler/placement/placement.go | 4 +-
pkg/scheduler/tests/mock_rm_callback_test.go | 4 +-
pkg/scheduler/ugm/group_tracker.go | 4 +-
pkg/scheduler/ugm/manager.go | 3 +-
pkg/scheduler/ugm/user_tracker.go | 4 +-
pkg/webservice/dao/config_info.go | 2 +
pkg/webservice/handlers.go | 7 +-
pkg/webservice/state_dump.go | 4 +-
pkg/webservice/streaming_limit.go | 4 +-
42 files changed, 420 insertions(+), 82 deletions(-)
diff --git a/go.mod b/go.mod
index 0f40b1b8..ca4f4674 100644
--- a/go.mod
+++ b/go.mod
@@ -31,6 +31,7 @@ require (
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
+ github.com/sasha-s/go-deadlock v0.3.1
go.uber.org/zap v1.26.0
golang.org/x/net v0.21.0
golang.org/x/time v0.5.0
@@ -45,6 +46,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
+ github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba //
indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/sys v0.17.0 // indirect
@@ -54,6 +56,7 @@ require (
)
replace (
+ github.com/petermattis/goid => github.com/petermattis/goid
v0.0.0-20240327183114-c42a807a84ba
golang.org/x/crypto => golang.org/x/crypto v0.19.0
golang.org/x/net => golang.org/x/net v0.21.0
golang.org/x/sys => golang.org/x/sys v0.17.0
diff --git a/go.sum b/go.sum
index 3bf3e91c..c39d5be9 100644
--- a/go.sum
+++ b/go.sum
@@ -26,6 +26,8 @@ github.com/looplab/fsm v1.0.1
h1:OEW0ORrIx095N/6lgoGkFkotqH6s7vaFPsgjLAaF5QU=
github.com/looplab/fsm v1.0.1/go.mod
h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0
h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod
h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
+github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba
h1:3jPgmsFGBID1wFfU2AbYocNcN4wqU68UaHSdMjiw/7U=
+github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba/go.mod
h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.18.0
h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
@@ -38,6 +40,8 @@ github.com/prometheus/procfs v0.12.0
h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod
h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rogpeppe/go-internal v1.10.0
h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod
h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/sasha-s/go-deadlock v0.3.1
h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
+github.com/sasha-s/go-deadlock v0.3.1/go.mod
h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/stretchr/testify v1.8.1
h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
diff --git a/pkg/common/configs/configs.go b/pkg/common/configs/configs.go
index 4671c896..8d591afe 100644
--- a/pkg/common/configs/configs.go
+++ b/pkg/common/configs/configs.go
@@ -19,9 +19,9 @@
package configs
import (
- "sync"
"time"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
)
@@ -52,14 +52,14 @@ var ConfigContext *SchedulerConfigContext
var configMap map[string]string
var configMapCallbacks map[string]func()
-var configMapLock sync.RWMutex
+var configMapLock locking.RWMutex
func init() {
configMap = make(map[string]string)
configMapCallbacks = make(map[string]func())
ConfigContext = &SchedulerConfigContext{
configs: make(map[string]*SchedulerConfig),
- lock: &sync.RWMutex{},
+ lock: &locking.RWMutex{},
}
// add a callback to reconfigure logging
@@ -71,7 +71,7 @@ func init() {
// scheduler config context provides thread-safe access for scheduler
configurations
type SchedulerConfigContext struct {
configs map[string]*SchedulerConfig
- lock *sync.RWMutex
+ lock *locking.RWMutex
}
func (ctx *SchedulerConfigContext) Set(policyGroup string, config
*SchedulerConfig) {
diff --git a/pkg/common/resources/tracked_resources.go
b/pkg/common/resources/tracked_resources.go
index 313e67e0..5198c337 100644
--- a/pkg/common/resources/tracked_resources.go
+++ b/pkg/common/resources/tracked_resources.go
@@ -21,8 +21,9 @@ package resources
import (
"fmt"
"strings"
- "sync"
"time"
+
+ "github.com/apache/yunikorn-core/pkg/locking"
)
// TrackedResource is a utility struct to keep track of application resource
usage.
@@ -32,7 +33,7 @@ type TrackedResource struct {
// resource type (CPU, memory, etc.) -> aggregated used time (in
seconds) of the resource type.
TrackedResourceMap map[string]map[string]int64
- sync.RWMutex
+ locking.RWMutex
}
// NewTrackedResource creates a new instance of TrackedResource.
diff --git a/pkg/common/security/usergroup.go b/pkg/common/security/usergroup.go
index 9bd16981..ac181ebb 100644
--- a/pkg/common/security/usergroup.go
+++ b/pkg/common/security/usergroup.go
@@ -28,6 +28,7 @@ import (
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -46,7 +47,7 @@ var stopped atomic.Bool // whether UserGroupCache is
stopped (needed for mu
// Cache for the user entries.
type UserGroupCache struct {
- lock sync.RWMutex
+ lock locking.RWMutex
interval time.Duration
ugs map[string]*UserGroup
// methods that allow mocking of the class or extending to use non OS
solutions
diff --git a/pkg/events/event_ringbuffer.go b/pkg/events/event_ringbuffer.go
index 4ddea827..72a824d1 100644
--- a/pkg/events/event_ringbuffer.go
+++ b/pkg/events/event_ringbuffer.go
@@ -20,10 +20,10 @@ package events
import (
"strconv"
- "sync"
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -51,7 +51,7 @@ type eventRingBuffer struct {
lowestId uint64 // lowest id of an event record available in the
buffer at any given time
resizeOffset uint64 // used to aid the calculation of id->pos after
resize (see id2pos)
- sync.RWMutex
+ locking.RWMutex
}
// Add adds an event to the ring buffer. If the buffer is full, the oldest
element is overwritten.
diff --git a/pkg/events/event_store.go b/pkg/events/event_store.go
index 638cea8a..ff27079b 100644
--- a/pkg/events/event_store.go
+++ b/pkg/events/event_store.go
@@ -19,8 +19,7 @@
package events
import (
- "sync"
-
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -36,7 +35,7 @@ const defaultEventStoreSize = 1000
type EventStore struct {
events []*si.EventRecord
idx int // points where to store the next event
- sync.RWMutex
+ locking.RWMutex
}
func newEventStore() *EventStore {
diff --git a/pkg/events/event_streaming.go b/pkg/events/event_streaming.go
index a43d1fa5..652bef20 100644
--- a/pkg/events/event_streaming.go
+++ b/pkg/events/event_streaming.go
@@ -19,11 +19,11 @@
package events
import (
- "sync"
"time"
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -36,7 +36,7 @@ type EventStreaming struct {
buffer *eventRingBuffer
stopCh chan struct{}
eventStreams map[*EventStream]eventConsumerDetails
- sync.RWMutex
+ locking.RWMutex
}
type eventConsumerDetails struct {
diff --git a/pkg/events/event_system.go b/pkg/events/event_system.go
index 70ce2f26..86fbd8d2 100644
--- a/pkg/events/event_system.go
+++ b/pkg/events/event_system.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -94,7 +95,7 @@ type EventSystemImpl struct {
requestCapacity int
ringBufferCapacity uint64
- sync.RWMutex
+ locking.RWMutex
}
// CreateEventStream creates an event stream. See the interface for details.
diff --git a/pkg/examples/simple_example.go b/pkg/examples/simple_example.go
index 077be0d6..590cf6fd 100644
--- a/pkg/examples/simple_example.go
+++ b/pkg/examples/simple_example.go
@@ -19,9 +19,8 @@
package examples
import (
- "sync"
-
"github.com/apache/yunikorn-core/pkg/entrypoint"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -35,7 +34,7 @@ type exampleRMCallback struct {
nodeAllocations map[string][]*si.Allocation
Allocations map[string]*si.Allocation
- sync.RWMutex
+ locking.RWMutex
}
func (m *exampleRMCallback) UpdateAllocation(response *si.AllocationResponse)
error {
diff --git a/pkg/locking/locking.go b/pkg/locking/locking.go
new file mode 100644
index 00000000..598a1ffb
--- /dev/null
+++ b/pkg/locking/locking.go
@@ -0,0 +1,117 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package locking
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ godeadlock "github.com/sasha-s/go-deadlock"
+
+ "github.com/apache/yunikorn-core/pkg/log"
+)
+
+const EnvDeadlockDetectionEnabled = "DEADLOCK_DETECTION_ENABLED"
+const EnvDeadlockTimeoutSeconds = "DEADLOCK_TIMEOUT_SECONDS"
+
+var once sync.Once
+var trackingEnabled atomic.Bool
+var timeoutSeconds atomic.Int32
+var deadlockDetected atomic.Bool
+
+type errorBuf struct {
+ data string
+ sync.Mutex
+}
+
+func (b *errorBuf) Write(p []byte) (n int, err error) {
+ if b == nil {
+ return len(p), nil
+ }
+ b.Lock()
+ defer b.Unlock()
+ b.data += string(p)
+ return len(p), nil
+}
+
+func init() {
+ once.Do(reInit)
+}
+
+func reInit() {
+ enabled, err :=
strconv.ParseBool(os.Getenv(EnvDeadlockDetectionEnabled))
+ if err != nil {
+ enabled = false
+ }
+ trackingEnabled.Store(enabled)
+
+ timeoutSec, err :=
strconv.ParseInt(os.Getenv(EnvDeadlockTimeoutSeconds), 10, 32)
+ if err != nil {
+ timeoutSec = 60
+ }
+ timeoutSeconds.Store(int32(timeoutSec))
+ godeadlock.Opts.Disable = !enabled
+ godeadlock.Opts.DeadlockTimeout = time.Duration(timeoutSec) *
time.Second
+ godeadlock.Opts.LogBuf = &errorBuf{}
+ godeadlock.Opts.OnPotentialDeadlock = onPotentialDeadlock
+ if enabled {
+ fmt.Fprintf(os.Stderr, "=== Deadlock detection enabled
(timeout: %d seconds) ===\n", timeoutSec)
+ }
+}
+
+func onPotentialDeadlock() {
+ deadlockDetected.Store(true)
+ buf, ok := godeadlock.Opts.LogBuf.(*errorBuf)
+ buf.Lock()
+ defer buf.Unlock()
+ if !ok {
+ log.Log(log.Diagnostics).Error("POTENTIAL DEADLOCK: No details
available")
+ } else {
+ log.Log(log.Diagnostics).Error(buf.data)
+ }
+ buf.data = ""
+}
+
+func SetTrackingEnabled(enabled bool) {
+ trackingEnabled.Store(enabled)
+}
+
+func IsTrackingEnabled() bool {
+ return trackingEnabled.Load()
+}
+
+func GetDeadlockTimeoutSeconds() int {
+ return int(timeoutSeconds.Load())
+}
+
+func IsDeadlockDetected() bool {
+ return deadlockDetected.Load()
+}
+
+type Mutex struct {
+ godeadlock.Mutex
+}
+
+type RWMutex struct {
+ godeadlock.RWMutex
+}
diff --git a/pkg/plugins/types.go b/pkg/locking/locking_race_test.go
similarity index 61%
copy from pkg/plugins/types.go
copy to pkg/locking/locking_race_test.go
index 114c97d8..c1930150 100644
--- a/pkg/plugins/types.go
+++ b/pkg/locking/locking_race_test.go
@@ -1,3 +1,5 @@
+//go:build !race
+
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -16,17 +18,28 @@
limitations under the License.
*/
-package plugins
+//nolint:staticcheck
+package locking
import (
- "sync"
+ "testing"
+ "time"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/api"
+ "gotest.tools/v3/assert"
)
-type SchedulerPlugins struct {
- ResourceManagerCallbackPlugin api.ResourceManagerCallback
- StateDumpPlugin api.StateDumpPlugin
+func TestDeadlockDetection(t *testing.T) {
+ enableTracking()
+ deadlockDetected.Store(false)
+ defer disableTracking()
- sync.RWMutex
+ var mutex Mutex
+ go func() {
+ mutex.Lock()
+ mutex.Lock() // will deadlock
+ mutex.Unlock() // will unwind second lock
+ }()
+ time.Sleep(2 * time.Second)
+ mutex.Unlock() // will unwind first lock
+ assert.Assert(t, IsDeadlockDetected(), "Deadlock should have been
detected")
}
diff --git a/pkg/locking/locking_test.go b/pkg/locking/locking_test.go
new file mode 100644
index 00000000..cb62d5d2
--- /dev/null
+++ b/pkg/locking/locking_test.go
@@ -0,0 +1,193 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+//nolint:staticcheck
+package locking
+
+import (
+ "os"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "gotest.tools/v3/assert"
+
+ godeadlock "github.com/sasha-s/go-deadlock"
+)
+
+func disableTracking() {
+ os.Unsetenv(EnvDeadlockDetectionEnabled)
+ os.Unsetenv(EnvDeadlockTimeoutSeconds)
+ reInit()
+}
+
+func enableTracking() {
+ os.Setenv(EnvDeadlockDetectionEnabled, "true")
+ os.Setenv(EnvDeadlockTimeoutSeconds, "1")
+ reInit()
+}
+
+func BenchmarkSyncMutex(b *testing.B) {
+ lock := sync.Mutex{}
+ for i := 0; i < b.N; i++ {
+ lock.Lock()
+ lock.Unlock()
+ }
+}
+
+func BenchmarkSyncRWMutexRead(b *testing.B) {
+ lock := sync.RWMutex{}
+ for i := 0; i < b.N; i++ {
+ lock.RLock()
+ lock.RUnlock()
+ }
+}
+
+func BenchmarkSyncRWMutexWrite(b *testing.B) {
+ lock := sync.RWMutex{}
+ for i := 0; i < b.N; i++ {
+ lock.Lock()
+ lock.Unlock()
+ }
+}
+
+func BenchmarkGoDeadlockMutex(b *testing.B) {
+ enableTracking()
+ defer disableTracking()
+ lock := godeadlock.Mutex{}
+ for i := 0; i < b.N; i++ {
+ lock.Lock()
+ lock.Unlock()
+ }
+}
+
+func BenchmarkGoDeadlockRWMutexRead(b *testing.B) {
+ enableTracking()
+ defer disableTracking()
+ lock := godeadlock.RWMutex{}
+ for i := 0; i < b.N; i++ {
+ lock.RLock()
+ lock.RUnlock()
+ }
+}
+
+func BenchmarkGoDeadlockRWMutexWrite(b *testing.B) {
+ enableTracking()
+ defer disableTracking()
+ lock := godeadlock.RWMutex{}
+ for i := 0; i < b.N; i++ {
+ lock.Lock()
+ lock.Unlock()
+ }
+}
+
+func BenchmarkUntrackedMutex(b *testing.B) {
+ disableTracking()
+ var lock Mutex
+ for i := 0; i < b.N; i++ {
+ lock.Lock()
+ lock.Unlock()
+ }
+}
+
+func BenchmarkUntrackedRWMutexRead(b *testing.B) {
+ disableTracking()
+ var lock RWMutex
+ for i := 0; i < b.N; i++ {
+ lock.RLock()
+ lock.RUnlock()
+ }
+}
+
+func BenchmarkUntrackedRWMutexWrite(b *testing.B) {
+ disableTracking()
+ var lock RWMutex
+ for i := 0; i < b.N; i++ {
+ lock.Lock()
+ lock.Unlock()
+ }
+}
+func BenchmarkTrackedMutex(b *testing.B) {
+ enableTracking()
+ defer disableTracking()
+ var lock Mutex
+ for i := 0; i < b.N; i++ {
+ lock.Lock()
+ lock.Unlock()
+ }
+}
+
+func BenchmarkTrackedRWMutexRead(b *testing.B) {
+ enableTracking()
+ defer disableTracking()
+ var lock RWMutex
+ for i := 0; i < b.N; i++ {
+ lock.RLock()
+ lock.RUnlock()
+ }
+}
+
+func BenchmarkTrackedRWMutexWrite(b *testing.B) {
+ enableTracking()
+ defer disableTracking()
+ var lock RWMutex
+ for i := 0; i < b.N; i++ {
+ lock.Lock()
+ lock.Unlock()
+ }
+}
+
+func TestMutex(t *testing.T) {
+ var mutex Mutex
+ var result atomic.Int32
+ mutex.Lock()
+ go func() {
+ mutex.Lock()
+ result.Store(2)
+ mutex.Unlock()
+ }()
+ time.Sleep(100 * time.Millisecond)
+ result.Store(1)
+ mutex.Unlock()
+ time.Sleep(100 * time.Millisecond)
+ assert.Equal(t, int32(2), result.Load())
+}
+
+func TestRWMutex(t *testing.T) {
+ var mutex RWMutex
+ var count atomic.Int32
+ mutex.RLock()
+ go func() {
+ mutex.Lock()
+ count.Add(1)
+ mutex.Unlock()
+ }()
+ go func() {
+ mutex.Lock()
+ count.Add(1)
+ mutex.Unlock()
+ }()
+ time.Sleep(100 * time.Millisecond)
+ before := count.Load()
+ mutex.RUnlock()
+ time.Sleep(500 * time.Millisecond)
+ after := count.Load()
+ assert.Equal(t, before, int32(0))
+ assert.Equal(t, after, int32(2))
+}
diff --git a/pkg/log/logger.go b/pkg/log/logger.go
index 5d9f861d..8802fc9d 100644
--- a/pkg/log/logger.go
+++ b/pkg/log/logger.go
@@ -82,13 +82,14 @@ var (
SchedNodesUsage = &LoggerHandle{id: 25, name:
"core.scheduler.nodesusage"}
Security = &LoggerHandle{id: 26, name: "core.security"}
Utils = &LoggerHandle{id: 27, name: "core.utils"}
+ Diagnostics = &LoggerHandle{id: 28, name: "core.diagnostics"}
)
// this tracks all the known logger handles, used to preallocate the real
logger instances when configuration changes
var loggers = []*LoggerHandle{
Core, Test, Deprecation, Config, Entrypoint, Events, OpenTracing,
Resources, REST, RMProxy, RPC, Metrics,
Scheduler, SchedAllocation, SchedApplication, SchedAppUsage,
SchedContext, SchedFSM, SchedHealth, SchedNode,
- SchedPartition, SchedPreemption, SchedQueue, SchedReservation,
SchedUGM, SchedNodesUsage, Security, Utils,
+ SchedPartition, SchedPreemption, SchedQueue, SchedReservation,
SchedUGM, SchedNodesUsage, Security, Utils, Diagnostics,
}
// structure to hold all current logger configuration state
diff --git a/pkg/log/logger_test.go b/pkg/log/logger_test.go
index e991bd0c..48160d05 100644
--- a/pkg/log/logger_test.go
+++ b/pkg/log/logger_test.go
@@ -39,7 +39,7 @@ func TestLoggerIds(t *testing.T) {
_ = Log(Test)
// validate logger count
- assert.Equal(t, 28, len(loggers), "wrong logger count")
+ assert.Equal(t, 29, len(loggers), "wrong logger count")
// validate that all loggers are populated and have sequential ids
for i := 0; i < len(loggers); i++ {
diff --git a/pkg/metrics/history/internal_metrics.go
b/pkg/metrics/history/internal_metrics.go
index 470349fc..90c4a136 100644
--- a/pkg/metrics/history/internal_metrics.go
+++ b/pkg/metrics/history/internal_metrics.go
@@ -19,8 +19,9 @@
package history
import (
- "sync"
"time"
+
+ "github.com/apache/yunikorn-core/pkg/locking"
)
// This class collects basic information about the cluster
@@ -33,7 +34,7 @@ type InternalMetricsHistory struct {
// internal implementation of limited array
pointer int
- sync.RWMutex
+ locking.RWMutex
}
type MetricsRecord struct {
diff --git a/pkg/metrics/init.go b/pkg/metrics/init.go
index 39b2120e..ce478014 100644
--- a/pkg/metrics/init.go
+++ b/pkg/metrics/init.go
@@ -20,6 +20,8 @@ package metrics
import (
"sync"
+
+ "github.com/apache/yunikorn-core/pkg/locking"
)
const (
@@ -41,7 +43,7 @@ type Metrics struct {
queues map[string]*QueueMetrics
event *EventMetrics
runtime *RuntimeMetrics
- lock sync.RWMutex
+ lock locking.RWMutex
}
func init() {
@@ -50,7 +52,7 @@ func init() {
scheduler: InitSchedulerMetrics(),
queues: make(map[string]*QueueMetrics),
event: initEventMetrics(),
- lock: sync.RWMutex{},
+ lock: locking.RWMutex{},
runtime: initRuntimeMetrics(),
}
})
diff --git a/pkg/metrics/scheduler.go b/pkg/metrics/scheduler.go
index ad31c755..fd566a6d 100644
--- a/pkg/metrics/scheduler.go
+++ b/pkg/metrics/scheduler.go
@@ -20,13 +20,13 @@ package metrics
import (
"fmt"
- "sync"
"time"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
)
@@ -66,13 +66,13 @@ type SchedulerMetrics struct {
sortingLatency *prometheus.HistogramVec
tryNodeLatency prometheus.Histogram
tryPreemptionLatency prometheus.Histogram
- lock sync.RWMutex
+ lock locking.RWMutex
}
// InitSchedulerMetrics to initialize scheduler metrics
func InitSchedulerMetrics() *SchedulerMetrics {
s := &SchedulerMetrics{
- lock: sync.RWMutex{},
+ lock: locking.RWMutex{},
}
s.nodeResourceUsage = make(map[string]*prometheus.GaugeVec) // Note:
This map might be updated at runtime
diff --git a/pkg/mock/container_state_updater.go
b/pkg/mock/container_state_updater.go
index e5890032..fd2160ac 100644
--- a/pkg/mock/container_state_updater.go
+++ b/pkg/mock/container_state_updater.go
@@ -19,15 +19,14 @@
package mock
import (
- "sync"
-
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type ContainerStateUpdater struct {
ResourceManagerCallback
sentUpdate *si.UpdateContainerSchedulingStateRequest
- sync.RWMutex
+ locking.RWMutex
}
func (m *ContainerStateUpdater) UpdateContainerSchedulingState(request
*si.UpdateContainerSchedulingStateRequest) {
diff --git a/pkg/mock/event_plugin.go b/pkg/mock/event_plugin.go
index d4584add..0dbed84e 100644
--- a/pkg/mock/event_plugin.go
+++ b/pkg/mock/event_plugin.go
@@ -19,8 +19,7 @@
package mock
import (
- "sync"
-
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -28,7 +27,7 @@ type EventPlugin struct {
ResourceManagerCallback
records chan *si.EventRecord
- sync.Mutex
+ locking.Mutex
}
func (m *EventPlugin) SendEvent(events []*si.EventRecord) {
diff --git a/pkg/plugins/types.go b/pkg/plugins/types.go
index 114c97d8..3a31234f 100644
--- a/pkg/plugins/types.go
+++ b/pkg/plugins/types.go
@@ -19,8 +19,7 @@
package plugins
import (
- "sync"
-
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
)
@@ -28,5 +27,5 @@ type SchedulerPlugins struct {
ResourceManagerCallbackPlugin api.ResourceManagerCallback
StateDumpPlugin api.StateDumpPlugin
- sync.RWMutex
+ locking.RWMutex
}
diff --git a/pkg/rmproxy/rmproxy.go b/pkg/rmproxy/rmproxy.go
index ecf06bea..3215072a 100644
--- a/pkg/rmproxy/rmproxy.go
+++ b/pkg/rmproxy/rmproxy.go
@@ -22,12 +22,12 @@ import (
"fmt"
"reflect"
"strconv"
- "sync"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/handler"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/plugins"
@@ -47,7 +47,7 @@ type RMProxy struct {
rmIDToCallback map[string]api.ResourceManagerCallback
- sync.RWMutex
+ locking.RWMutex
}
func (rmp *RMProxy) GetRMEventHandler() handler.EventHandler {
diff --git a/pkg/rmproxy/rmproxy_mock.go b/pkg/rmproxy/rmproxy_mock.go
index 1c880771..5c629266 100644
--- a/pkg/rmproxy/rmproxy_mock.go
+++ b/pkg/rmproxy/rmproxy_mock.go
@@ -19,8 +19,7 @@
package rmproxy
import (
- "sync"
-
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
)
@@ -28,7 +27,7 @@ import (
type MockedRMProxy struct {
handled bool
events []interface{}
- sync.RWMutex
+ locking.RWMutex
}
func NewMockedRMProxy() *MockedRMProxy {
diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index 1a71b600..7c636d32 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -22,7 +22,6 @@ import (
"fmt"
"math"
"strconv"
- "sync"
"time"
"go.uber.org/zap"
@@ -31,6 +30,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/handler"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
@@ -55,7 +55,7 @@ type ClusterContext struct {
rmInfo map[string]*RMInformation
startTime time.Time
- sync.RWMutex
+ locking.RWMutex
lastHealthCheckResult *dao.SchedulerHealthDAOInfo
}
diff --git a/pkg/scheduler/health_checker.go b/pkg/scheduler/health_checker.go
index c7657738..e8ba9728 100644
--- a/pkg/scheduler/health_checker.go
+++ b/pkg/scheduler/health_checker.go
@@ -20,13 +20,13 @@ package scheduler
import (
"fmt"
- "sync"
"time"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
@@ -42,7 +42,7 @@ type HealthChecker struct {
period time.Duration
enabled bool
- sync.RWMutex
+ locking.RWMutex
}
func NewHealthChecker(schedulerContext *ClusterContext) *HealthChecker {
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index 9fefcb83..8812a891 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -21,13 +21,13 @@ package objects
import (
"fmt"
"strconv"
- "sync"
"time"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -74,7 +74,7 @@ type Allocation struct {
preempted bool
instType string
- sync.RWMutex
+ locking.RWMutex
}
func NewAllocation(nodeID string, ask *AllocationAsk) *Allocation {
diff --git a/pkg/scheduler/objects/allocation_ask.go
b/pkg/scheduler/objects/allocation_ask.go
index 181e7f57..6b8bce6e 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -20,7 +20,6 @@ package objects
import (
"fmt"
- "sync"
"time"
"go.uber.org/zap"
@@ -28,6 +27,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -64,7 +64,7 @@ type AllocationAsk struct {
userQuotaCheckFailed bool
headroomCheckFailed bool
- sync.RWMutex
+ locking.RWMutex
}
type AllocationLogEntry struct {
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index a064d321..3eee666d 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -35,6 +35,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/handler"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
@@ -121,7 +122,7 @@ type Application struct {
appEvents *applicationEvents
sendStateChangeEvents bool // whether to send state-change events or
not (simplifies testing)
- sync.RWMutex
+ locking.RWMutex
}
func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index edce1281..97039f89 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -22,12 +22,12 @@ import (
"fmt"
"strconv"
"strings"
- "sync"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
@@ -60,7 +60,7 @@ type Node struct {
listeners []NodeListener // a list of node listeners
nodeEvents *nodeEvents
- sync.RWMutex
+ locking.RWMutex
}
func NewNode(proto *si.NodeInfo) *Node {
diff --git a/pkg/scheduler/objects/node_collection.go
b/pkg/scheduler/objects/node_collection.go
index 11a9a35c..f991eec5 100644
--- a/pkg/scheduler/objects/node_collection.go
+++ b/pkg/scheduler/objects/node_collection.go
@@ -20,11 +20,11 @@ package objects
import (
"fmt"
- "sync"
"github.com/google/btree"
"go.uber.org/zap"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
)
@@ -79,7 +79,7 @@ type baseNodeCollection struct {
unreservedIterator *treeIterator
fullIterator *treeIterator
- sync.RWMutex
+ locking.RWMutex
}
func (nc *baseNodeCollection) scoreNode(node *Node) float64 {
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 43c66e8b..5611d2a8 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -24,7 +24,6 @@ import (
"fmt"
"strconv"
"strings"
- "sync"
"time"
"github.com/looplab/fsm"
@@ -35,6 +34,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
@@ -89,7 +89,7 @@ type Queue struct {
template *template.Template
queueEvents *queueEvents
- sync.RWMutex
+ locking.RWMutex
}
// newBlankQueue creates a new empty queue objects with all values initialised.
diff --git a/pkg/scheduler/objects/required_node_preemptor.go
b/pkg/scheduler/objects/required_node_preemptor.go
index 0a750714..a43ab677 100644
--- a/pkg/scheduler/objects/required_node_preemptor.go
+++ b/pkg/scheduler/objects/required_node_preemptor.go
@@ -20,9 +20,9 @@ package objects
import (
"sort"
- "sync"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/locking"
)
type PreemptionContext struct {
@@ -30,7 +30,7 @@ type PreemptionContext struct {
requiredAsk *AllocationAsk
allocations []*Allocation
- sync.RWMutex
+ locking.RWMutex
}
func NewRequiredNodePreemptor(node *Node, requiredAsk *AllocationAsk)
*PreemptionContext {
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index c318c6b8..ba2cfb03 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -24,7 +24,6 @@ import (
"math"
"strconv"
"strings"
- "sync"
"time"
"github.com/looplab/fsm"
@@ -34,6 +33,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
@@ -75,7 +75,7 @@ type PartitionContext struct {
// while manipulating the application.
// Similarly adding, updating or removing a node or a queue should only
hold the partition write lock
// while manipulating the partition information not while manipulating
the underlying objects.
- sync.RWMutex
+ locking.RWMutex
}
func newPartitionContext(conf configs.PartitionConfig, rmID string, cc
*ClusterContext) (*PartitionContext, error) {
diff --git a/pkg/scheduler/placement/placement.go
b/pkg/scheduler/placement/placement.go
index b74a4865..08aac0c4 100644
--- a/pkg/scheduler/placement/placement.go
+++ b/pkg/scheduler/placement/placement.go
@@ -21,11 +21,11 @@ package placement
import (
"fmt"
"strings"
- "sync"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common/configs"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/placement/types"
@@ -35,7 +35,7 @@ type AppPlacementManager struct {
rules []rule
queueFn func(string) *objects.Queue
- sync.RWMutex
+ locking.RWMutex
}
func NewPlacementManager(rules []configs.PlacementRule, queueFunc func(string)
*objects.Queue) *AppPlacementManager {
diff --git a/pkg/scheduler/tests/mock_rm_callback_test.go
b/pkg/scheduler/tests/mock_rm_callback_test.go
index 3af34be9..f9212a00 100644
--- a/pkg/scheduler/tests/mock_rm_callback_test.go
+++ b/pkg/scheduler/tests/mock_rm_callback_test.go
@@ -19,13 +19,13 @@
package tests
import (
- "sync"
"testing"
"time"
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -39,7 +39,7 @@ type mockRMCallback struct {
nodeAllocations map[string][]*si.Allocation
Allocations map[string]*si.Allocation
- sync.RWMutex
+ locking.RWMutex
}
func newMockRMCallbackHandler() *mockRMCallback {
diff --git a/pkg/scheduler/ugm/group_tracker.go
b/pkg/scheduler/ugm/group_tracker.go
index fc94febd..12ff3259 100644
--- a/pkg/scheduler/ugm/group_tracker.go
+++ b/pkg/scheduler/ugm/group_tracker.go
@@ -20,11 +20,11 @@ package ugm
import (
"strings"
- "sync"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -34,7 +34,7 @@ type GroupTracker struct {
queueTracker *QueueTracker // Holds the actual resource usage of
queue path where application run
events *ugmEvents
- sync.RWMutex
+ locking.RWMutex
}
func newGroupTracker(groupName string, events *ugmEvents) *GroupTracker {
diff --git a/pkg/scheduler/ugm/manager.go b/pkg/scheduler/ugm/manager.go
index f0a76d56..c166fbb2 100644
--- a/pkg/scheduler/ugm/manager.go
+++ b/pkg/scheduler/ugm/manager.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
)
@@ -47,7 +48,7 @@ type Manager struct {
userLimits map[string]map[string]*LimitConfig // Holds
queue path * user limit config
groupLimits map[string]map[string]*LimitConfig // Holds
queue path * group limit config
events *ugmEvents
- sync.RWMutex
+ locking.RWMutex
}
func newManager() *Manager {
diff --git a/pkg/scheduler/ugm/user_tracker.go
b/pkg/scheduler/ugm/user_tracker.go
index 1f1ecfee..9a359d5c 100644
--- a/pkg/scheduler/ugm/user_tracker.go
+++ b/pkg/scheduler/ugm/user_tracker.go
@@ -20,13 +20,13 @@ package ugm
import (
"strings"
- "sync"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -42,7 +42,7 @@ type UserTracker struct {
queueTracker *QueueTracker // Holds the actual resource usage of
queue path where application runs
events *ugmEvents
- sync.RWMutex
+ locking.RWMutex
}
func newUserTracker(userName string, ugmEvents *ugmEvents) *UserTracker {
diff --git a/pkg/webservice/dao/config_info.go
b/pkg/webservice/dao/config_info.go
index a530fce9..39fdcd05 100644
--- a/pkg/webservice/dao/config_info.go
+++ b/pkg/webservice/dao/config_info.go
@@ -28,4 +28,6 @@ type ValidateConfResponse struct {
type ConfigDAOInfo struct {
*configs.SchedulerConfig `yaml:",inline"`
Extra map[string]string `yaml:",omitempty"
json:",omitempty"`
+ DeadlockDetectionEnabled bool
+ DeadlockTimeoutSeconds int
}
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index a10bac53..7cf2d419 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -39,6 +39,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
metrics2 "github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/metrics/history"
@@ -581,8 +582,10 @@ func getClusterConfig(w http.ResponseWriter, r
*http.Request) {
func getClusterConfigDAO() *dao.ConfigDAOInfo {
// merge core config with extra config
conf := dao.ConfigDAOInfo{
- SchedulerConfig:
configs.ConfigContext.Get(schedulerContext.GetPolicyGroup()),
- Extra: configs.GetConfigMap(),
+ SchedulerConfig:
configs.ConfigContext.Get(schedulerContext.GetPolicyGroup()),
+ Extra: configs.GetConfigMap(),
+ DeadlockDetectionEnabled: locking.IsTrackingEnabled(),
+ DeadlockTimeoutSeconds: locking.GetDeadlockTimeoutSeconds(),
}
return &conf
diff --git a/pkg/webservice/state_dump.go b/pkg/webservice/state_dump.go
index ca448db3..bb3f98f3 100644
--- a/pkg/webservice/state_dump.go
+++ b/pkg/webservice/state_dump.go
@@ -23,10 +23,10 @@ import (
"io"
"log"
"net/http"
- "sync"
"time"
"github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-core/pkg/locking"
yunikornLog "github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
@@ -35,7 +35,7 @@ const (
stateLogCallDepth = 2
)
-var stateDump sync.Mutex // ensures only one state dump can be handled at a
time
+var stateDump locking.Mutex // ensures only one state dump can be handled at a
time
type AggregatedStateInfo struct {
Timestamp int64
`json:"timestamp,omitempty"`
diff --git a/pkg/webservice/streaming_limit.go
b/pkg/webservice/streaming_limit.go
index fe2df79c..94cc91a5 100644
--- a/pkg/webservice/streaming_limit.go
+++ b/pkg/webservice/streaming_limit.go
@@ -21,12 +21,12 @@ package webservice
import (
"fmt"
"strconv"
- "sync"
"sync/atomic"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common/configs"
+ "github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
)
@@ -41,7 +41,7 @@ type StreamingLimiter struct {
maxStreams uint64 // maximum number of event streams
maxPerHostStreams uint64 // maximum number of event streams per host
- sync.Mutex
+ locking.Mutex
}
func NewStreamingLimiter() *StreamingLimiter {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]