This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git
The following commit(s) were added to refs/heads/master by this push:
new 4d6b4dec [YUNIKORN-2844] Inject event recorder externally (#922)
4d6b4dec is described below
commit 4d6b4dec2a986a7c2bbb1aa802b90073fd881314
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Oct 9 09:26:04 2024 -0600
[YUNIKORN-2844] Inject event recorder externally (#922)
Closes: #922
Signed-off-by: Craig Condit <[email protected]>
---
pkg/common/events/recorder.go | 39 +++++++++-----------------------------
pkg/common/events/recorder_test.go | 7 +------
pkg/shim/scheduler.go | 17 +++++++++++++++++
3 files changed, 27 insertions(+), 36 deletions(-)
diff --git a/pkg/common/events/recorder.go b/pkg/common/events/recorder.go
index 48a06fa7..cde9a64f 100644
--- a/pkg/common/events/recorder.go
+++ b/pkg/common/events/recorder.go
@@ -19,43 +19,22 @@
package events
import (
- "sync"
+ "sync/atomic"
- "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
-
- "github.com/apache/yunikorn-k8shim/pkg/client"
- "github.com/apache/yunikorn-k8shim/pkg/common/constants"
- "github.com/apache/yunikorn-k8shim/pkg/conf"
- "github.com/apache/yunikorn-k8shim/pkg/locking"
)
-var eventRecorder events.EventRecorder = events.NewFakeRecorder(1024)
-var once sync.Once
-var lock locking.RWMutex
+var eventRecorder atomic.Pointer[events.EventRecorder]
+
+func init() {
+ r := events.EventRecorder(NewMockedRecorder())
+ eventRecorder.Store(&r)
+}
func GetRecorder() events.EventRecorder {
- lock.Lock()
- defer lock.Unlock()
- once.Do(func() {
- // note, the initiation of the event recorder requires on a
workable Kubernetes client,
- // in test mode we should skip this and just use a fake
recorder instead.
- configs := conf.GetSchedulerConf()
- if !configs.IsTestMode() {
- k8sClient := client.NewKubeClient(configs.KubeConfig)
- eventBroadcaster :=
events.NewBroadcaster(&events.EventSinkImpl{
- Interface: k8sClient.GetClientSet().EventsV1()})
- eventBroadcaster.StartRecordingToSink(make(<-chan
struct{}))
- eventRecorder =
eventBroadcaster.NewRecorder(scheme.Scheme, constants.SchedulerName)
- }
- })
-
- return eventRecorder
+ return *eventRecorder.Load()
}
func SetRecorder(recorder events.EventRecorder) {
- lock.Lock()
- defer lock.Unlock()
- eventRecorder = recorder
- once.Do(func() {}) // make sure Do() doesn't fire elsewhere
+ eventRecorder.Store(&recorder)
}
diff --git a/pkg/common/events/recorder_test.go
b/pkg/common/events/recorder_test.go
index 7068ad26..b10e1f58 100644
--- a/pkg/common/events/recorder_test.go
+++ b/pkg/common/events/recorder_test.go
@@ -23,15 +23,10 @@ import (
"testing"
"gotest.tools/v3/assert"
-
- "github.com/apache/yunikorn-k8shim/pkg/conf"
)
func TestInit(t *testing.T) {
// simply test the get won't fail
- // which means the get function honors the testMode and
- // skips initiating a real event recorder
- conf.GetSchedulerConf().SetTestMode(true)
recorder := GetRecorder()
- assert.Equal(t, reflect.TypeOf(recorder).String(),
"*events.FakeRecorder")
+ assert.Equal(t, reflect.TypeOf(recorder).String(),
"*events.MockedRecorder")
}
diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go
index ebf3fb11..291083f4 100644
--- a/pkg/shim/scheduler.go
+++ b/pkg/shim/scheduler.go
@@ -19,15 +19,20 @@
package shim
import (
+ ctx "context"
"time"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes/scheme"
+ k8events "k8s.io/client-go/tools/events"
"github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/client"
+ "github.com/apache/yunikorn-k8shim/pkg/common/constants"
+ "github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
@@ -67,6 +72,18 @@ func NewShimScheduler(scheduler api.SchedulerAPI, configs
*conf.SchedulerConf, b
apiFactory := client.NewAPIFactory(scheduler, informerFactory, configs,
false)
context := cache.NewContextWithBootstrapConfigMaps(apiFactory,
bootstrapConfigMaps)
rmCallback := cache.NewAsyncRMCallback(context)
+
+ eventBroadcaster := k8events.NewBroadcaster(&k8events.EventSinkImpl{
+ Interface: kubeClient.GetClientSet().EventsV1()})
+ err :=
eventBroadcaster.StartRecordingToSinkWithContext(ctx.Background())
+ if err != nil {
+ log.Log(log.Shim).Error("Could not create event broadcaster",
+ zap.Error(err))
+ } else {
+ eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme,
constants.SchedulerName)
+ events.SetRecorder(eventRecorder)
+ }
+
return newShimSchedulerInternal(context, apiFactory, rmCallback)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]