This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-k8shim.git


The following commit(s) were added to refs/heads/master by this push:
     new beed0b24 [YUNIKORN-1930] Don't send all node events to the shim (#658)
beed0b24 is described below

commit beed0b2412b3ccc032772191c7182e4c026064de
Author: Peter Bacsko <[email protected]>
AuthorDate: Wed Aug 23 10:07:09 2023 -0500

    [YUNIKORN-1930] Don't send all node events to the shim (#658)
    
    Closes: #658
    
    Signed-off-by: Craig Condit <[email protected]>
---
 pkg/cache/context.go      |  17 ++++++--
 pkg/cache/context_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 118 insertions(+), 7 deletions(-)

diff --git a/pkg/cache/context.go b/pkg/cache/context.go
index ce401068..3495e725 100644
--- a/pkg/cache/context.go
+++ b/pkg/cache/context.go
@@ -982,6 +982,9 @@ func (ctx *Context) PublishEvents(eventRecords 
[]*si.EventRecord) {
                                                zap.Stringer("event", record))
                                }
                        case si.EventRecord_NODE:
+                               if !isPublishableNodeEvent(record) {
+                                       continue
+                               }
                                nodeID := record.ObjectID
                                nodeInfo := ctx.schedulerCache.GetNode(nodeID)
                                if nodeInfo == nil {
@@ -999,9 +1002,6 @@ func (ctx *Context) PublishEvents(eventRecords 
[]*si.EventRecord) {
                                }
                                events.GetRecorder().Eventf(node.DeepCopy(), 
nil,
                                        v1.EventTypeNormal, "", "", 
record.Message)
-                       default:
-                               log.Log(log.ShimContext).Warn("Unsupported 
event type, currently only supports to publish request event records",
-                                       zap.Stringer("type", record.Type))
                        }
                }
        }
@@ -1152,6 +1152,17 @@ func (ctx *Context) GetStateDump() (string, error) {
        return string(bytes), nil
 }
 
+func isPublishableNodeEvent(event *si.EventRecord) bool {
+       // we only send node added & removed event
+       if event.Type == si.EventRecord_NODE &&
+               ((event.EventChangeDetail == si.EventRecord_DETAILS_NONE && 
event.EventChangeType == si.EventRecord_ADD) ||
+                       (event.EventChangeDetail == 
si.EventRecord_NODE_DECOMISSION && event.EventChangeType == 
si.EventRecord_REMOVE)) {
+               return true
+       }
+
+       return false
+}
+
 // VisibleForTesting
 func (ctx *Context) GetSchedulerCache() *schedulercache.SchedulerCache {
        return ctx.schedulerCache
diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go
index 57a80754..0a3af42d 100644
--- a/pkg/cache/context_test.go
+++ b/pkg/cache/context_test.go
@@ -950,18 +950,22 @@ func TestNodeEventPublishedCorrectly(t *testing.T) {
                },
        }
        context.addNode(&node)
+       err := waitForNodeAcceptedEvent(recorder)
+       assert.NilError(t, err, "node accepted event was not sent")
 
        eventRecords := make([]*si.EventRecord, 0)
        message := "node_related_message"
        eventRecords = append(eventRecords, &si.EventRecord{
-               Type:     si.EventRecord_NODE,
-               ObjectID: "host0001",
-               Message:  message,
+               Type:              si.EventRecord_NODE,
+               EventChangeType:   si.EventRecord_ADD,
+               EventChangeDetail: si.EventRecord_DETAILS_NONE,
+               ObjectID:          "host0001",
+               Message:           message,
        })
        context.PublishEvents(eventRecords)
 
        // check that the event has been published
-       err := utils.WaitForCondition(func() bool {
+       err = utils.WaitForCondition(func() bool {
                for {
                        select {
                        case event := <-recorder.Events:
@@ -977,6 +981,84 @@ func TestNodeEventPublishedCorrectly(t *testing.T) {
        assert.NilError(t, err, "event should have been emitted")
 }
 
+func TestFilteredEventsNotPublished(t *testing.T) {
+       conf.GetSchedulerConf().SetTestMode(true)
+       recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder)
+       if !ok {
+               t.Fatal("the EventRecorder is expected to be of type 
FakeRecorder")
+       }
+       context := initContextForTest()
+
+       node := v1.Node{
+               ObjectMeta: apis.ObjectMeta{
+                       Name:      "host0001",
+                       Namespace: "default",
+                       UID:       "uid_0001",
+               },
+       }
+       context.addNode(&node)
+       err := waitForNodeAcceptedEvent(recorder)
+       assert.NilError(t, err, "node accepted event was not sent")
+
+       eventRecords := make([]*si.EventRecord, 7)
+       eventRecords[0] = &si.EventRecord{
+               Type:              si.EventRecord_NODE,
+               EventChangeType:   si.EventRecord_SET,
+               EventChangeDetail: si.EventRecord_NODE_SCHEDULABLE,
+               ObjectID:          "host0001",
+               Message:           "",
+       }
+       eventRecords[1] = &si.EventRecord{
+               Type:              si.EventRecord_NODE,
+               EventChangeType:   si.EventRecord_SET,
+               EventChangeDetail: si.EventRecord_NODE_READY,
+               ObjectID:          "host0001",
+               Message:           "",
+       }
+       eventRecords[2] = &si.EventRecord{
+               Type:              si.EventRecord_NODE,
+               EventChangeType:   si.EventRecord_SET,
+               EventChangeDetail: si.EventRecord_NODE_OCCUPIED,
+               ObjectID:          "host0001",
+               Message:           "",
+       }
+       eventRecords[3] = &si.EventRecord{
+               Type:              si.EventRecord_NODE,
+               EventChangeType:   si.EventRecord_SET,
+               EventChangeDetail: si.EventRecord_NODE_CAPACITY,
+               ObjectID:          "host0001",
+               Message:           "",
+       }
+       eventRecords[4] = &si.EventRecord{
+               Type:              si.EventRecord_NODE,
+               EventChangeType:   si.EventRecord_ADD,
+               EventChangeDetail: si.EventRecord_NODE_ALLOC,
+               ObjectID:          "host0001",
+               Message:           "",
+       }
+       eventRecords[5] = &si.EventRecord{
+               Type:              si.EventRecord_APP,
+               EventChangeType:   si.EventRecord_ADD,
+               EventChangeDetail: si.EventRecord_APP_STARTING,
+               ObjectID:          "app-1",
+               Message:           "",
+       }
+       eventRecords[6] = &si.EventRecord{
+               Type:              si.EventRecord_QUEUE,
+               EventChangeType:   si.EventRecord_ADD,
+               EventChangeDetail: si.EventRecord_DETAILS_NONE,
+               ObjectID:          "root.test",
+               Message:           "",
+       }
+       context.PublishEvents(eventRecords)
+
+       select {
+       case e := <-recorder.Events:
+               t.Errorf("received an unexpected event %s", e)
+       default:
+       }
+}
+
 func TestPublishEventsWithNotExistingAsk(t *testing.T) {
        conf.GetSchedulerConf().SetTestMode(true)
        recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder)
@@ -1497,3 +1579,21 @@ func TestCtxUpdatePodCondition(t *testing.T) {
        updated = context.updatePodCondition(task, &condition)
        assert.Equal(t, true, updated)
 }
+
+func waitForNodeAcceptedEvent(recorder *k8sEvents.FakeRecorder) error {
+       // fetch the "node accepted" event
+       err := utils.WaitForCondition(func() bool {
+               for {
+                       select {
+                       case event := <-recorder.Events:
+                               log.Log(log.Test).Info(event)
+                               if strings.Contains(event, "accepted by the 
scheduler") {
+                                       return true
+                               }
+                       default:
+                               return false
+                       }
+               }
+       }, 10*time.Millisecond, time.Second)
+       return err
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to