This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d6b4dec [YUNIKORN-2844] Inject event recorder externally (#922)
4d6b4dec is described below

commit 4d6b4dec2a986a7c2bbb1aa802b90073fd881314
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Oct 9 09:26:04 2024 -0600

    [YUNIKORN-2844] Inject event recorder externally (#922)
    
    Closes: #922
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/common/events/recorder.go      | 39 +++++++++-----------------------------
 pkg/common/events/recorder_test.go |  7 +------
 pkg/shim/scheduler.go              | 17 +++++++++++++++++
 3 files changed, 27 insertions(+), 36 deletions(-)

diff --git a/pkg/common/events/recorder.go b/pkg/common/events/recorder.go
index 48a06fa7..cde9a64f 100644
--- a/pkg/common/events/recorder.go
+++ b/pkg/common/events/recorder.go
@@ -19,43 +19,22 @@
 package events
 
 import (
-       "sync"
+       "sync/atomic"
 
-       "k8s.io/client-go/kubernetes/scheme"
        "k8s.io/client-go/tools/events"
-
-       "github.com/apache/yunikorn-k8shim/pkg/client"
-       "github.com/apache/yunikorn-k8shim/pkg/common/constants"
-       "github.com/apache/yunikorn-k8shim/pkg/conf"
-       "github.com/apache/yunikorn-k8shim/pkg/locking"
 )
 
-var eventRecorder events.EventRecorder = events.NewFakeRecorder(1024)
-var once sync.Once
-var lock locking.RWMutex
+var eventRecorder atomic.Pointer[events.EventRecorder]
+
+func init() {
+       r := events.EventRecorder(NewMockedRecorder())
+       eventRecorder.Store(&r)
+}
 
 func GetRecorder() events.EventRecorder {
-       lock.Lock()
-       defer lock.Unlock()
-       once.Do(func() {
-               // note, the initiation of the event recorder requires on a 
workable Kubernetes client,
-               // in test mode we should skip this and just use a fake 
recorder instead.
-               configs := conf.GetSchedulerConf()
-               if !configs.IsTestMode() {
-                       k8sClient := client.NewKubeClient(configs.KubeConfig)
-                       eventBroadcaster := 
events.NewBroadcaster(&events.EventSinkImpl{
-                               Interface: k8sClient.GetClientSet().EventsV1()})
-                       eventBroadcaster.StartRecordingToSink(make(<-chan 
struct{}))
-                       eventRecorder = 
eventBroadcaster.NewRecorder(scheme.Scheme, constants.SchedulerName)
-               }
-       })
-
-       return eventRecorder
+       return *eventRecorder.Load()
 }
 
 func SetRecorder(recorder events.EventRecorder) {
-       lock.Lock()
-       defer lock.Unlock()
-       eventRecorder = recorder
-       once.Do(func() {}) // make sure Do() doesn't fire elsewhere
+       eventRecorder.Store(&recorder)
 }
diff --git a/pkg/common/events/recorder_test.go 
b/pkg/common/events/recorder_test.go
index 7068ad26..b10e1f58 100644
--- a/pkg/common/events/recorder_test.go
+++ b/pkg/common/events/recorder_test.go
@@ -23,15 +23,10 @@ import (
        "testing"
 
        "gotest.tools/v3/assert"
-
-       "github.com/apache/yunikorn-k8shim/pkg/conf"
 )
 
 func TestInit(t *testing.T) {
        // simply test the get won't fail
-       // which means the get function honors the testMode and
-       // skips initiating a real event recorder
-       conf.GetSchedulerConf().SetTestMode(true)
        recorder := GetRecorder()
-       assert.Equal(t, reflect.TypeOf(recorder).String(), 
"*events.FakeRecorder")
+       assert.Equal(t, reflect.TypeOf(recorder).String(), 
"*events.MockedRecorder")
 }
diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go
index ebf3fb11..291083f4 100644
--- a/pkg/shim/scheduler.go
+++ b/pkg/shim/scheduler.go
@@ -19,15 +19,20 @@
 package shim
 
 import (
+       ctx "context"
        "time"
 
        "go.uber.org/zap"
        v1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/util/wait"
        "k8s.io/client-go/informers"
+       "k8s.io/client-go/kubernetes/scheme"
+       k8events "k8s.io/client-go/tools/events"
 
        "github.com/apache/yunikorn-k8shim/pkg/cache"
        "github.com/apache/yunikorn-k8shim/pkg/client"
+       "github.com/apache/yunikorn-k8shim/pkg/common/constants"
+       "github.com/apache/yunikorn-k8shim/pkg/common/events"
        "github.com/apache/yunikorn-k8shim/pkg/common/utils"
        "github.com/apache/yunikorn-k8shim/pkg/conf"
        "github.com/apache/yunikorn-k8shim/pkg/dispatcher"
@@ -67,6 +72,18 @@ func NewShimScheduler(scheduler api.SchedulerAPI, configs 
*conf.SchedulerConf, b
        apiFactory := client.NewAPIFactory(scheduler, informerFactory, configs, 
false)
        context := cache.NewContextWithBootstrapConfigMaps(apiFactory, 
bootstrapConfigMaps)
        rmCallback := cache.NewAsyncRMCallback(context)
+
+       eventBroadcaster := k8events.NewBroadcaster(&k8events.EventSinkImpl{
+               Interface: kubeClient.GetClientSet().EventsV1()})
+       err := 
eventBroadcaster.StartRecordingToSinkWithContext(ctx.Background())
+       if err != nil {
+               log.Log(log.Shim).Error("Could not create event broadcaster",
+                       zap.Error(err))
+       } else {
+               eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, 
constants.SchedulerName)
+               events.SetRecorder(eventRecorder)
+       }
+
        return newShimSchedulerInternal(context, apiFactory, rmCallback)
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to