This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new bd9a060c [YUNIKORN-2568] Move all xxxEvents types to objects/events
(#903)
bd9a060c is described below
commit bd9a060c3097e5400e38e0c4d23d9d75fd056481
Author: Peter Bacsko <[email protected]>
AuthorDate: Tue Jul 2 23:26:10 2024 +0200
[YUNIKORN-2568] Move all xxxEvents types to objects/events (#903)
Closes: #903
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/scheduler/objects/allocation_ask.go | 17 +-
pkg/scheduler/objects/allocation_ask_test.go | 5 +-
pkg/scheduler/objects/application.go | 29 ++--
pkg/scheduler/objects/application_events.go | 148 ----------------
pkg/scheduler/objects/application_state.go | 2 +-
pkg/scheduler/objects/application_state_test.go | 7 +
pkg/scheduler/objects/application_test.go | 9 +-
pkg/scheduler/objects/events/application_events.go | 150 ++++++++++++++++
.../{ => events}/application_events_test.go | 190 ++++++++-------------
pkg/scheduler/objects/{ => events}/ask_events.go | 22 +--
.../objects/{ => events}/ask_events_test.go | 52 +++---
pkg/scheduler/objects/{ => events}/node_events.go | 26 +--
.../objects/{ => events}/node_events_test.go | 78 ++++-----
pkg/scheduler/objects/{ => events}/queue_events.go | 22 +--
.../objects/{ => events}/queue_events_test.go | 74 ++++----
pkg/scheduler/objects/node.go | 23 +--
pkg/scheduler/objects/node_test.go | 5 +-
pkg/scheduler/objects/queue.go | 33 ++--
pkg/scheduler/objects/queue_test.go | 12 +-
pkg/scheduler/objects/utilities_test.go | 3 +-
20 files changed, 442 insertions(+), 465 deletions(-)
diff --git a/pkg/scheduler/objects/allocation_ask.go
b/pkg/scheduler/objects/allocation_ask.go
index 341a19e9..09be0acb 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -59,7 +60,7 @@ type AllocationAsk struct {
scaleUpTriggered bool // whether this ask has triggered
autoscaling or not
resKeyPerNode map[string]string // reservation key for a given
node
- askEvents *askEvents
+ askEvents *schedEvt.AskEvents
userQuotaCheckFailed bool
headroomCheckFailed bool
@@ -79,7 +80,7 @@ func NewAllocationAsk(allocationKey string, applicationID
string, allocatedResou
allocatedResource: allocatedResource,
allocLog: make(map[string]*AllocationLogEntry),
resKeyPerNode: make(map[string]string),
- askEvents: newAskEvents(events.GetEventSystem()),
+ askEvents:
schedEvt.NewAskEvents(events.GetEventSystem()),
}
aa.resKeyWithoutNode = reservationKeyWithoutNode(applicationID,
allocationKey)
return aa
@@ -112,7 +113,7 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk)
*AllocationAsk {
originator: ask.Originator,
allocLog: make(map[string]*AllocationLogEntry),
resKeyPerNode: make(map[string]string),
- askEvents: newAskEvents(events.GetEventSystem()),
+ askEvents:
schedEvt.NewAskEvents(events.GetEventSystem()),
}
// this is a safety check placeholder and task group name must be set
as a combo
// order is important as task group can be set without placeholder but
not the other way around
@@ -273,7 +274,7 @@ func (aa *AllocationAsk) LogAllocationFailure(message
string, allocate bool) {
}
func (aa *AllocationAsk) SendPredicateFailedEvent(message string) {
- aa.askEvents.sendPredicateFailed(aa.allocationKey, aa.applicationID,
message, aa.GetAllocatedResource())
+ aa.askEvents.SendPredicateFailed(aa.allocationKey, aa.applicationID,
message, aa.GetAllocatedResource())
}
// GetAllocationLog returns a list of log entries corresponding to allocation
preconditions not being met
@@ -357,7 +358,7 @@ func (aa *AllocationAsk) setHeadroomCheckFailed(headroom
*resources.Resource, qu
defer aa.Unlock()
if !aa.headroomCheckFailed {
aa.headroomCheckFailed = true
- aa.askEvents.sendRequestExceedsQueueHeadroom(aa.allocationKey,
aa.applicationID, headroom, aa.allocatedResource, queue)
+ aa.askEvents.SendRequestExceedsQueueHeadroom(aa.allocationKey,
aa.applicationID, headroom, aa.allocatedResource, queue)
}
}
@@ -366,7 +367,7 @@ func (aa *AllocationAsk) setHeadroomCheckPassed(queue
string) {
defer aa.Unlock()
if aa.headroomCheckFailed {
aa.headroomCheckFailed = false
- aa.askEvents.sendRequestFitsInQueue(aa.allocationKey,
aa.applicationID, queue, aa.allocatedResource)
+ aa.askEvents.SendRequestFitsInQueue(aa.allocationKey,
aa.applicationID, queue, aa.allocatedResource)
}
}
@@ -375,7 +376,7 @@ func (aa *AllocationAsk) setUserQuotaCheckFailed(available
*resources.Resource)
defer aa.Unlock()
if !aa.userQuotaCheckFailed {
aa.userQuotaCheckFailed = true
- aa.askEvents.sendRequestExceedsUserQuota(aa.allocationKey,
aa.applicationID, available, aa.allocatedResource)
+ aa.askEvents.SendRequestExceedsUserQuota(aa.allocationKey,
aa.applicationID, available, aa.allocatedResource)
}
}
@@ -384,6 +385,6 @@ func (aa *AllocationAsk) setUserQuotaCheckPassed() {
defer aa.Unlock()
if aa.userQuotaCheckFailed {
aa.userQuotaCheckFailed = false
- aa.askEvents.sendRequestFitsInUserQuota(aa.allocationKey,
aa.applicationID, aa.allocatedResource)
+ aa.askEvents.SendRequestFitsInUserQuota(aa.allocationKey,
aa.applicationID, aa.allocatedResource)
}
}
diff --git a/pkg/scheduler/objects/allocation_ask_test.go
b/pkg/scheduler/objects/allocation_ask_test.go
index a9739754..ab8d722a 100644
--- a/pkg/scheduler/objects/allocation_ask_test.go
+++ b/pkg/scheduler/objects/allocation_ask_test.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events/mock"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -213,12 +214,12 @@ func TestSendPredicateFailed(t *testing.T) {
}
ask := NewAllocationAskFromSI(siAsk)
eventSystem := mock.NewEventSystemDisabled()
- ask.askEvents = newAskEvents(eventSystem)
+ ask.askEvents = schedEvt.NewAskEvents(eventSystem)
ask.SendPredicateFailedEvent("failed")
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- ask.askEvents = newAskEvents(eventSystem)
+ ask.askEvents = schedEvt.NewAskEvents(eventSystem)
ask.SendPredicateFailedEvent("failure")
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index e3325f57..b15e8e80 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -39,6 +39,7 @@ import (
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -117,7 +118,7 @@ type Application struct {
rmEventHandler handler.EventHandler
rmID string
terminatedCallback func(appID string)
- appEvents *applicationEvents
+ appEvents *schedEvt.ApplicationEvents
sendStateChangeEvents bool // whether to send state-change events or
not (simplifies testing)
locking.RWMutex
@@ -187,8 +188,8 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi
security.UserGroup, eve
app.user = ugi
app.rmEventHandler = eventHandler
app.rmID = rmID
- app.appEvents = newApplicationEvents(events.GetEventSystem())
- app.appEvents.sendNewApplicationEvent(app.ApplicationID)
+ app.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
+ app.appEvents.SendNewApplicationEvent(app.ApplicationID)
return app
}
@@ -553,7 +554,7 @@ func (sa *Application) removeAsksInternal(allocKey string,
detail si.EventRecord
deltaPendingResource = sa.pending
sa.pending = resources.NewResource()
for _, ask := range sa.requests {
- sa.appEvents.sendRemoveAskEvent(ask, detail)
+ sa.appEvents.SendRemoveAskEvent(sa.ApplicationID,
ask.allocationKey, ask.allocatedResource, detail)
}
sa.requests = make(map[string]*AllocationAsk)
sa.sortedRequests = sortedRequests{}
@@ -582,7 +583,7 @@ func (sa *Application) removeAsksInternal(allocKey string,
detail si.EventRecord
}
delete(sa.requests, allocKey)
sa.sortedRequests.remove(ask)
- sa.appEvents.sendRemoveAskEvent(ask, detail)
+ sa.appEvents.SendRemoveAskEvent(sa.ApplicationID,
ask.allocationKey, ask.allocatedResource, detail)
if priority := ask.GetPriority(); priority >=
sa.askMaxPriority {
sa.updateAskMaxPriority()
}
@@ -655,7 +656,7 @@ func (sa *Application) AddAllocationAsk(ask *AllocationAsk)
error {
zap.Bool("placeholder", ask.IsPlaceholder()),
zap.Stringer("pendingDelta", delta))
sa.sortedRequests.insert(ask)
- sa.appEvents.sendNewAskEvent(ask)
+ sa.appEvents.SendNewAskEvent(sa.ApplicationID, ask.allocationKey,
ask.allocatedResource)
return nil
}
@@ -1145,7 +1146,7 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
// release the placeholder and tell the RM
ph.SetReleased(true)
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT,
"cancel placeholder: resource incompatible")
- sa.appEvents.sendPlaceholderLargerEvent(ph,
request)
+
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID,
ph.allocationKey, request.allocatedResource, ph.allocatedResource)
continue
}
// placeholder is the same or larger continue
processing and difference is handled when the placeholder
@@ -1667,7 +1668,7 @@ func (sa *Application) addAllocationInternal(allocType
AllocationResultType, all
sa.allocatedResource = resources.Add(sa.allocatedResource,
alloc.GetAllocatedResource())
sa.maxAllocatedResource =
resources.ComponentWiseMax(sa.allocatedResource, sa.maxAllocatedResource)
}
- sa.appEvents.sendNewAllocationEvent(alloc)
+ sa.appEvents.SendNewAllocationEvent(sa.ApplicationID,
alloc.allocationKey, alloc.allocatedResource)
sa.allocations[alloc.GetAllocationKey()] = alloc
}
@@ -1822,7 +1823,7 @@ func (sa *Application)
removeAllocationInternal(allocationKey string, releaseTyp
}
}
delete(sa.allocations, allocationKey)
- sa.appEvents.sendRemoveAllocationEvent(alloc, releaseType)
+ sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID,
alloc.allocationKey, alloc.allocatedResource, releaseType)
return alloc
}
@@ -1853,7 +1854,7 @@ func (sa *Application) RemoveAllAllocations()
[]*Allocation {
allocationsToRelease = append(allocationsToRelease, alloc)
// Aggregate the resources used by this alloc to the
application's user resource tracker
sa.trackCompletedResource(alloc)
- sa.appEvents.sendRemoveAllocationEvent(alloc,
si.TerminationType_STOPPED_BY_RM)
+ sa.appEvents.SendRemoveAllocationEvent(sa.ApplicationID,
alloc.allocationKey, alloc.allocatedResource, si.TerminationType_STOPPED_BY_RM)
}
// if an app doesn't have any allocations and the user doesn't have
other applications,
@@ -2098,12 +2099,12 @@ func (sa *Application)
updateRunnableStatus(runnableInQueue, runnableByUserLimit
log.Log(log.SchedApplication).Info("Application is now
runnable in queue",
zap.String("appID", sa.ApplicationID),
zap.String("queue", sa.queuePath))
-
sa.appEvents.sendAppRunnableInQueueEvent(sa.ApplicationID)
+
sa.appEvents.SendAppRunnableInQueueEvent(sa.ApplicationID)
} else {
log.Log(log.SchedApplication).Info("Maximum number of
running applications reached the queue limit",
zap.String("appID", sa.ApplicationID),
zap.String("queue", sa.queuePath))
-
sa.appEvents.sendAppNotRunnableInQueueEvent(sa.ApplicationID)
+
sa.appEvents.SendAppNotRunnableInQueueEvent(sa.ApplicationID)
}
}
sa.runnableInQueue = runnableInQueue
@@ -2115,14 +2116,14 @@ func (sa *Application)
updateRunnableStatus(runnableInQueue, runnableByUserLimit
zap.String("queue", sa.queuePath),
zap.String("user", sa.user.User),
zap.Strings("groups", sa.user.Groups))
- sa.appEvents.sendAppRunnableQuotaEvent(sa.ApplicationID)
+ sa.appEvents.SendAppRunnableQuotaEvent(sa.ApplicationID)
} else {
log.Log(log.SchedApplication).Info("Maximum number of
running applications reached the user/group limit",
zap.String("appID", sa.ApplicationID),
zap.String("queue", sa.queuePath),
zap.String("user", sa.user.User),
zap.Strings("groups", sa.user.Groups))
-
sa.appEvents.sendAppNotRunnableQuotaEvent(sa.ApplicationID)
+
sa.appEvents.SendAppNotRunnableQuotaEvent(sa.ApplicationID)
}
}
sa.runnableByUserLimit = runnableByUserLimit
diff --git a/pkg/scheduler/objects/application_events.go
b/pkg/scheduler/objects/application_events.go
deleted file mode 100644
index 04fe51a0..00000000
--- a/pkg/scheduler/objects/application_events.go
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-*/
-
-package objects
-
-import (
- "fmt"
- "github.com/apache/yunikorn-core/pkg/common"
- "github.com/apache/yunikorn-core/pkg/events"
- "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
-)
-
-type applicationEvents struct {
- eventSystem events.EventSystem
-}
-
-func (evt *applicationEvents) sendPlaceholderLargerEvent(ph *Allocation,
request *AllocationAsk) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- message := fmt.Sprintf("Task group '%s' in application '%s': allocation
resources '%s' are not matching placeholder '%s' allocation with ID '%s'",
ph.GetTaskGroup(), ph.GetApplicationID(), request.GetAllocatedResource(),
ph.GetAllocatedResource(), ph.GetAllocationKey())
- event := events.CreateRequestEventRecord(ph.GetAllocationKey(),
ph.GetApplicationID(), message, request.GetAllocatedResource())
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendNewAllocationEvent(alloc *Allocation) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(alloc.GetApplicationID(),
common.Empty, alloc.GetAllocationKey(), si.EventRecord_ADD,
si.EventRecord_APP_ALLOC, alloc.GetAllocatedResource())
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendNewAskEvent(request *AllocationAsk) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(request.GetApplicationID(),
common.Empty, request.GetAllocationKey(), si.EventRecord_ADD,
si.EventRecord_APP_REQUEST, request.GetAllocatedResource())
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendRemoveAllocationEvent(alloc *Allocation,
terminationType si.TerminationType) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
-
- var eventChangeDetail si.EventRecord_ChangeDetail
- switch terminationType {
- case si.TerminationType_UNKNOWN_TERMINATION_TYPE:
- eventChangeDetail = si.EventRecord_ALLOC_NODEREMOVED
- case si.TerminationType_STOPPED_BY_RM:
- eventChangeDetail = si.EventRecord_ALLOC_CANCEL
- case si.TerminationType_TIMEOUT:
- eventChangeDetail = si.EventRecord_ALLOC_TIMEOUT
- case si.TerminationType_PREEMPTED_BY_SCHEDULER:
- eventChangeDetail = si.EventRecord_ALLOC_PREEMPT
- case si.TerminationType_PLACEHOLDER_REPLACED:
- eventChangeDetail = si.EventRecord_ALLOC_REPLACED
- }
-
- event := events.CreateAppEventRecord(alloc.GetApplicationID(),
common.Empty, alloc.GetAllocationKey(), si.EventRecord_REMOVE,
eventChangeDetail, alloc.GetAllocatedResource())
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendRemoveAskEvent(request *AllocationAsk,
detail si.EventRecord_ChangeDetail) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(request.GetApplicationID(),
common.Empty, request.GetAllocationKey(), si.EventRecord_REMOVE, detail,
request.GetAllocatedResource())
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendNewApplicationEvent(appID string) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_ADD, si.EventRecord_APP_NEW, nil)
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendRemoveApplicationEvent(appID string) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_REMOVE, si.EventRecord_DETAILS_NONE, nil)
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendStateChangeEvent(appID string, changeDetail
si.EventRecord_ChangeDetail, eventInfo string) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(appID, eventInfo, common.Empty,
si.EventRecord_SET, changeDetail, nil)
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendAppNotRunnableInQueueEvent(appID string) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_NONE, si.EventRecord_APP_CANNOTRUN_QUEUE, nil)
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendAppRunnableInQueueEvent(appID string) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_NONE, si.EventRecord_APP_RUNNABLE_QUEUE, nil)
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendAppNotRunnableQuotaEvent(appID string) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_NONE, si.EventRecord_APP_CANNOTRUN_QUOTA, nil)
- evt.eventSystem.AddEvent(event)
-}
-
-func (evt *applicationEvents) sendAppRunnableQuotaEvent(appID string) {
- if !evt.eventSystem.IsEventTrackingEnabled() {
- return
- }
- event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_NONE, si.EventRecord_APP_RUNNABLE_QUOTA, nil)
- evt.eventSystem.AddEvent(event)
-}
-
-func newApplicationEvents(evt events.EventSystem) *applicationEvents {
- return &applicationEvents{
- eventSystem: evt,
- }
-}
diff --git a/pkg/scheduler/objects/application_state.go
b/pkg/scheduler/objects/application_state.go
index f0d89a95..4b4251fe 100644
--- a/pkg/scheduler/objects/application_state.go
+++ b/pkg/scheduler/objects/application_state.go
@@ -159,7 +159,7 @@ func NewAppState() *fsm.FSM {
return
}
if app.sendStateChangeEvents {
-
app.appEvents.sendStateChangeEvent(app.ApplicationID, eventDetails, eventInfo)
+
app.appEvents.SendStateChangeEvent(app.ApplicationID, eventDetails, eventInfo)
}
},
"leave_state": func(_ context.Context, event
*fsm.Event) {
diff --git a/pkg/scheduler/objects/application_state_test.go
b/pkg/scheduler/objects/application_state_test.go
index 3b64e3fe..ba54aa37 100644
--- a/pkg/scheduler/objects/application_state_test.go
+++ b/pkg/scheduler/objects/application_state_test.go
@@ -515,3 +515,10 @@ func createQueue(t *testing.T, queueName string) *Queue {
assert.NilError(t, err, "failed to create queue: %v", err)
return queue
}
+
+func isStateChangeEvent(t *testing.T, app *Application, changeDetail
si.EventRecord_ChangeDetail, record *si.EventRecord) {
+ assert.Equal(t, si.EventRecord_APP, record.Type, "incorrect event type,
expect app")
+ assert.Equal(t, app.ApplicationID, record.ObjectID, "incorrect object
ID, expected application ID")
+ assert.Equal(t, si.EventRecord_SET, record.EventChangeType, "incorrect
change type, expected set")
+ assert.Equal(t, changeDetail, record.EventChangeDetail, "incorrect
change detail")
+}
diff --git a/pkg/scheduler/objects/application_test.go
b/pkg/scheduler/objects/application_test.go
index 6a942c29..bb84a900 100644
--- a/pkg/scheduler/objects/application_test.go
+++ b/pkg/scheduler/objects/application_test.go
@@ -35,6 +35,7 @@ import (
"github.com/apache/yunikorn-core/pkg/handler"
"github.com/apache/yunikorn-core/pkg/rmproxy"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
@@ -2239,7 +2240,7 @@ func TestRequestDoesNotFitQueueEvents(t *testing.T) {
ask := newAllocationAsk("alloc-0", "app-1", res)
app := newApplication(appID1, "default", "root.default")
eventSystem := mock.NewEventSystem()
- ask.askEvents = newAskEvents(eventSystem)
+ ask.askEvents = schedEvt.NewAskEvents(eventSystem)
app.disableStateChangeEvents()
app.resetAppEvents()
queue, err := createRootQueue(nil)
@@ -2310,7 +2311,7 @@ func TestRequestDoesNotFitUserQuotaQueueEvents(t
*testing.T) {
ask := newAllocationAsk("alloc-0", "app-1", res)
app := newApplication(appID1, "default", "root")
eventSystem := mock.NewEventSystem()
- ask.askEvents = newAskEvents(eventSystem)
+ ask.askEvents = schedEvt.NewAskEvents(eventSystem)
app.disableStateChangeEvents()
app.resetAppEvents()
queue, err := createRootQueue(nil)
@@ -2611,7 +2612,7 @@ func TestUpdateRunnableStatus(t *testing.T) {
assert.Assert(t, app.runnableInQueue)
assert.Assert(t, app.runnableByUserLimit)
eventSystem := mock.NewEventSystem()
- app.appEvents = newApplicationEvents(eventSystem)
+ app.appEvents = schedEvt.NewApplicationEvents(eventSystem)
// App runnable - no events
app.updateRunnableStatus(true, true)
@@ -2739,5 +2740,5 @@ func (sa *Application) disableStateChangeEvents() {
func (sa *Application) resetAppEvents() {
sa.Lock()
defer sa.Unlock()
- sa.appEvents = newApplicationEvents(events.GetEventSystem())
+ sa.appEvents = schedEvt.NewApplicationEvents(events.GetEventSystem())
}
diff --git a/pkg/scheduler/objects/events/application_events.go
b/pkg/scheduler/objects/events/application_events.go
new file mode 100644
index 00000000..2f870fcf
--- /dev/null
+++ b/pkg/scheduler/objects/events/application_events.go
@@ -0,0 +1,150 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package events
+
+import (
+ "fmt"
+
+ "github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+ "github.com/apache/yunikorn-core/pkg/events"
+ "github.com/apache/yunikorn-scheduler-interface/lib/go/si"
+)
+
+type ApplicationEvents struct {
+ eventSystem events.EventSystem
+}
+
+func (ae *ApplicationEvents) SendPlaceholderLargerEvent(taskGroup, appID,
phAllocKey string, askRes, phRes *resources.Resource) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ message := fmt.Sprintf("Task group '%s' in application '%s': allocation
resources '%s' are not matching placeholder '%s' allocation with ID '%s'",
taskGroup, appID, askRes, phRes, phAllocKey)
+ event := events.CreateRequestEventRecord(phAllocKey, appID, message,
askRes)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendNewAllocationEvent(appID, allocKey string,
allocated *resources.Resource) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, allocKey,
si.EventRecord_ADD, si.EventRecord_APP_ALLOC, allocated)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendNewAskEvent(appID, allocKey string, allocated
*resources.Resource) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, allocKey,
si.EventRecord_ADD, si.EventRecord_APP_REQUEST, allocated)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendRemoveAllocationEvent(appID, allocKey string,
allocated *resources.Resource, terminationType si.TerminationType) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+
+ var eventChangeDetail si.EventRecord_ChangeDetail
+ switch terminationType {
+ case si.TerminationType_UNKNOWN_TERMINATION_TYPE:
+ eventChangeDetail = si.EventRecord_ALLOC_NODEREMOVED
+ case si.TerminationType_STOPPED_BY_RM:
+ eventChangeDetail = si.EventRecord_ALLOC_CANCEL
+ case si.TerminationType_TIMEOUT:
+ eventChangeDetail = si.EventRecord_ALLOC_TIMEOUT
+ case si.TerminationType_PREEMPTED_BY_SCHEDULER:
+ eventChangeDetail = si.EventRecord_ALLOC_PREEMPT
+ case si.TerminationType_PLACEHOLDER_REPLACED:
+ eventChangeDetail = si.EventRecord_ALLOC_REPLACED
+ }
+
+ event := events.CreateAppEventRecord(appID, common.Empty, allocKey,
si.EventRecord_REMOVE, eventChangeDetail, allocated)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendRemoveAskEvent(appID, allocKey string,
allocated *resources.Resource, detail si.EventRecord_ChangeDetail) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, allocKey,
si.EventRecord_REMOVE, detail, allocated)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendNewApplicationEvent(appID string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_ADD, si.EventRecord_APP_NEW, nil)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendRemoveApplicationEvent(appID string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_REMOVE, si.EventRecord_DETAILS_NONE, nil)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendStateChangeEvent(appID string, changeDetail
si.EventRecord_ChangeDetail, eventInfo string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, eventInfo, common.Empty,
si.EventRecord_SET, changeDetail, nil)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendAppNotRunnableInQueueEvent(appID string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_NONE, si.EventRecord_APP_CANNOTRUN_QUEUE, nil)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendAppRunnableInQueueEvent(appID string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_NONE, si.EventRecord_APP_RUNNABLE_QUEUE, nil)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendAppNotRunnableQuotaEvent(appID string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_NONE, si.EventRecord_APP_CANNOTRUN_QUOTA, nil)
+ ae.eventSystem.AddEvent(event)
+}
+
+func (ae *ApplicationEvents) SendAppRunnableQuotaEvent(appID string) {
+ if !ae.eventSystem.IsEventTrackingEnabled() {
+ return
+ }
+ event := events.CreateAppEventRecord(appID, common.Empty, common.Empty,
si.EventRecord_NONE, si.EventRecord_APP_RUNNABLE_QUOTA, nil)
+ ae.eventSystem.AddEvent(event)
+}
+
+func NewApplicationEvents(es events.EventSystem) *ApplicationEvents {
+ return &ApplicationEvents{
+ eventSystem: es,
+ }
+}
diff --git a/pkg/scheduler/objects/application_events_test.go
b/pkg/scheduler/objects/events/application_events_test.go
similarity index 62%
rename from pkg/scheduler/objects/application_events_test.go
rename to pkg/scheduler/objects/events/application_events_test.go
index 94f6b40b..6f24a221 100644
--- a/pkg/scheduler/objects/application_events_test.go
+++ b/pkg/scheduler/objects/events/application_events_test.go
@@ -16,7 +16,7 @@
limitations under the License.
*/
-package objects
+package events
import (
"testing"
@@ -24,187 +24,144 @@ import (
"gotest.tools/v3/assert"
"github.com/apache/yunikorn-core/pkg/common"
+ "github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/events/mock"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-func isNewApplicationEvent(t *testing.T, app *Application, record
*si.EventRecord) {
- assert.Equal(t, si.EventRecord_APP, record.Type, "incorrect event type,
expect app")
- assert.Equal(t, app.ApplicationID, record.ObjectID, "incorrect object
ID, expected application ID")
- assert.Equal(t, si.EventRecord_ADD, record.EventChangeType, "incorrect
change type, expected add")
- assert.Equal(t, si.EventRecord_APP_NEW, record.EventChangeDetail,
"incorrect change detail, expected none")
-}
-
-func isRemoveApplicationEvent(t *testing.T, app *Application, record
*si.EventRecord) {
- assert.Equal(t, si.EventRecord_APP, record.Type, "incorrect event type,
expect app")
- assert.Equal(t, app.ApplicationID, record.ObjectID, "incorrect object
ID, expected application ID")
- assert.Equal(t, si.EventRecord_REMOVE, record.EventChangeType,
"incorrect change type, expected remove")
- assert.Equal(t, si.EventRecord_DETAILS_NONE, record.EventChangeDetail,
"incorrect change detail, expected none")
-}
-
-func isStateChangeEvent(t *testing.T, app *Application, changeDetail
si.EventRecord_ChangeDetail, record *si.EventRecord) {
- assert.Equal(t, si.EventRecord_APP, record.Type, "incorrect event type,
expect app")
- assert.Equal(t, app.ApplicationID, record.ObjectID, "incorrect object
ID, expected application ID")
- assert.Equal(t, si.EventRecord_SET, record.EventChangeType, "incorrect
change type, expected set")
- assert.Equal(t, changeDetail, record.EventChangeDetail, "incorrect
change detail")
-}
+const (
+ taskGroup = "tg-0"
+ appID = "app-0"
+ allocKey = "alloc-0"
+)
func TestSendPlaceholderLargerEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendPlaceholderLargerEvent(&Allocation{}, &AllocationAsk{})
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendPlaceholderLargerEvent(taskGroup, appID, allocKey,
resources.NewResource(), resources.NewResource())
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendPlaceholderLargerEvent(&Allocation{
- allocationKey: aKey,
- }, &AllocationAsk{
- applicationID: appID0,
- allocationKey: aKey,
- })
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendPlaceholderLargerEvent(taskGroup, appID, allocKey,
resources.NewResource(), resources.NewResource())
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
}
func TestSendNewAllocationEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendNewAllocationEvent(&Allocation{})
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendNewAllocationEvent(appID, allocKey,
resources.NewResource())
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
+ appEvents = NewApplicationEvents(eventSystem)
assert.Assert(t, appEvents.eventSystem != nil, "event system should not
be nil")
- appEvents.sendNewAllocationEvent(&Allocation{
- applicationID: appID0,
- allocationKey: aKey,
- })
+ appEvents.SendNewAllocationEvent(appID, allocKey,
resources.NewResource())
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
assert.Equal(t, si.EventRecord_APP, eventSystem.Events[0].Type, "event
type is not expected")
assert.Equal(t, si.EventRecord_ADD,
eventSystem.Events[0].EventChangeType, "event change type is not expected")
assert.Equal(t, si.EventRecord_APP_ALLOC,
eventSystem.Events[0].EventChangeDetail, "event change detail is not expected")
- assert.Equal(t, appID0, eventSystem.Events[0].ObjectID, "event object
id is not expected")
- assert.Equal(t, aKey, eventSystem.Events[0].ReferenceID, "event
reference id is not expected")
+ assert.Equal(t, appID, eventSystem.Events[0].ObjectID, "event object id
is not expected")
+ assert.Equal(t, allocKey, eventSystem.Events[0].ReferenceID, "event
reference id is not expected")
assert.Equal(t, common.Empty, eventSystem.Events[0].Message, "message
is not expected")
}
func TestSendNewAskEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendNewAskEvent(&AllocationAsk{})
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendNewAskEvent(appID, allocKey, resources.NewResource())
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
+ appEvents = NewApplicationEvents(eventSystem)
assert.Assert(t, appEvents.eventSystem != nil, "event system should not
be nil")
- appEvents.sendNewAskEvent(&AllocationAsk{
- applicationID: appID0,
- allocationKey: aKey,
- })
+ appEvents.SendNewAskEvent(appID, allocKey, resources.NewResource())
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
assert.Equal(t, si.EventRecord_APP, eventSystem.Events[0].Type, "event
type is not expected")
assert.Equal(t, si.EventRecord_ADD,
eventSystem.Events[0].EventChangeType, "event change type is not expected")
assert.Equal(t, si.EventRecord_APP_REQUEST,
eventSystem.Events[0].EventChangeDetail, "event change detail is not expected")
- assert.Equal(t, appID0, eventSystem.Events[0].ObjectID, "event object
id is not expected")
- assert.Equal(t, aKey, eventSystem.Events[0].ReferenceID, "event
reference id is not expected")
+ assert.Equal(t, appID, eventSystem.Events[0].ObjectID, "event object id
is not expected")
+ assert.Equal(t, allocKey, eventSystem.Events[0].ReferenceID, "event
reference id is not expected")
assert.Equal(t, common.Empty, eventSystem.Events[0].Message, "message
is not expected")
}
func TestSendRemoveAllocationEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendRemoveAllocationEvent(&Allocation{},
si.TerminationType_STOPPED_BY_RM)
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendRemoveAllocationEvent(appID, allocKey,
resources.NewResource(), si.TerminationType_STOPPED_BY_RM)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
testCases := []struct {
name string
eventSystemMock *mock.EventSystem
terminationType si.TerminationType
- allocation *Allocation
expectedEventCnt int
expectedType si.EventRecord_Type
expectedChangeType si.EventRecord_ChangeType
expectedChangeDetail si.EventRecord_ChangeDetail
- expectedObjectID string
- expectedReferenceID string
}{
{
name: "remove allocation cause of node
removal",
eventSystemMock: mock.NewEventSystem(),
terminationType:
si.TerminationType_UNKNOWN_TERMINATION_TYPE,
- allocation: &Allocation{applicationID:
appID0, allocationKey: aKey},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
expectedChangeDetail: si.EventRecord_ALLOC_NODEREMOVED,
- expectedObjectID: appID0,
- expectedReferenceID: aKey,
},
{
name: "remove allocation cause of
resource manager cancel",
eventSystemMock: mock.NewEventSystem(),
terminationType: si.TerminationType_STOPPED_BY_RM,
- allocation: &Allocation{applicationID:
appID0, allocationKey: aKey},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
expectedChangeDetail: si.EventRecord_ALLOC_CANCEL,
- expectedObjectID: appID0,
- expectedReferenceID: aKey,
},
{
name: "remove allocation cause of
timeout",
eventSystemMock: mock.NewEventSystem(),
terminationType: si.TerminationType_TIMEOUT,
- allocation: &Allocation{applicationID:
appID0, allocationKey: aKey},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
expectedChangeDetail: si.EventRecord_ALLOC_TIMEOUT,
- expectedObjectID: appID0,
- expectedReferenceID: aKey,
},
{
name: "remove allocation cause of
preemption",
eventSystemMock: mock.NewEventSystem(),
terminationType:
si.TerminationType_PREEMPTED_BY_SCHEDULER,
- allocation: &Allocation{applicationID:
appID0, allocationKey: aKey},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
expectedChangeDetail: si.EventRecord_ALLOC_PREEMPT,
- expectedObjectID: appID0,
- expectedReferenceID: aKey,
},
{
name: "remove allocation cause of
replacement",
eventSystemMock: mock.NewEventSystem(),
terminationType:
si.TerminationType_PLACEHOLDER_REPLACED,
- allocation: &Allocation{applicationID:
appID0, allocationKey: aKey},
expectedEventCnt: 1,
expectedType: si.EventRecord_APP,
expectedChangeType: si.EventRecord_REMOVE,
expectedChangeDetail: si.EventRecord_ALLOC_REPLACED,
- expectedObjectID: appID0,
- expectedReferenceID: aKey,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
if testCase.eventSystemMock == nil {
- appEvents := newApplicationEvents(nil)
- assert.Assert(t, appEvents.eventSystem == nil,
"event system should be nil")
-
appEvents.sendRemoveAllocationEvent(testCase.allocation,
testCase.terminationType)
+ appEvt := NewApplicationEvents(nil)
+ assert.Assert(t, appEvt.eventSystem == nil,
"event system should be nil")
+ appEvt.SendRemoveAllocationEvent(appID,
allocKey, resources.NewResource(), testCase.terminationType)
} else {
- appEvents :=
newApplicationEvents(testCase.eventSystemMock)
- assert.Assert(t, appEvents.eventSystem != nil,
"event system should not be nil")
-
appEvents.sendRemoveAllocationEvent(testCase.allocation,
testCase.terminationType)
+ appEvt :=
NewApplicationEvents(testCase.eventSystemMock)
+ assert.Assert(t, appEvt.eventSystem != nil,
"event system should not be nil")
+ appEvt.SendRemoveAllocationEvent(appID,
allocKey, resources.NewResource(), testCase.terminationType)
assert.Equal(t, testCase.expectedEventCnt,
len(testCase.eventSystemMock.Events), "event was not generated")
assert.Equal(t, testCase.expectedType,
testCase.eventSystemMock.Events[0].Type, "event type is not expected")
assert.Equal(t, testCase.expectedChangeType,
testCase.eventSystemMock.Events[0].EventChangeType, "event change type is not
expected")
assert.Equal(t, testCase.expectedChangeDetail,
testCase.eventSystemMock.Events[0].EventChangeDetail, "event change detail is
not expected")
- assert.Equal(t, testCase.expectedObjectID,
testCase.eventSystemMock.Events[0].ObjectID, "event object id is not expected")
- assert.Equal(t, testCase.expectedReferenceID,
testCase.eventSystemMock.Events[0].ReferenceID, "event reference id is not
expected")
+ assert.Equal(t, appID,
testCase.eventSystemMock.Events[0].ObjectID, "event object id is not expected")
+ assert.Equal(t, allocKey,
testCase.eventSystemMock.Events[0].ReferenceID, "event reference id is not
expected")
assert.Equal(t, common.Empty,
testCase.eventSystemMock.Events[0].Message, "message is not expected")
}
})
@@ -213,44 +170,41 @@ func TestSendRemoveAllocationEvent(t *testing.T) {
func TestSendRemoveAskEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendRemoveAskEvent(&AllocationAsk{},
si.EventRecord_REQUEST_CANCEL)
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendRemoveAskEvent(appID, allocKey, resources.NewResource(),
si.EventRecord_REQUEST_CANCEL)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
- ask := &AllocationAsk{
- applicationID: appID0,
- allocationKey: aKey}
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendRemoveAskEvent(ask, si.EventRecord_REQUEST_CANCEL)
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendRemoveAskEvent(appID, allocKey, resources.NewResource(),
si.EventRecord_REQUEST_CANCEL)
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
assert.Equal(t, si.EventRecord_REQUEST_CANCEL, event.EventChangeDetail)
assert.Equal(t, "app-0", event.ObjectID)
- assert.Equal(t, "alloc-1", event.ReferenceID)
+ assert.Equal(t, "alloc-0", event.ReferenceID)
assert.Equal(t, "", event.Message)
eventSystem.Reset()
- appEvents.sendRemoveAskEvent(ask, si.EventRecord_REQUEST_TIMEOUT)
+ appEvents.SendRemoveAskEvent(appID, allocKey, resources.NewResource(),
si.EventRecord_REQUEST_TIMEOUT)
event = eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
assert.Equal(t, si.EventRecord_REQUEST_TIMEOUT, event.EventChangeDetail)
assert.Equal(t, "app-0", event.ObjectID)
- assert.Equal(t, "alloc-1", event.ReferenceID)
+ assert.Equal(t, "alloc-0", event.ReferenceID)
assert.Equal(t, "", event.Message)
}
func TestSendNewApplicationEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendNewApplicationEvent(appID0)
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendNewApplicationEvent(appID)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
mockEvents := mock.NewEventSystem()
- appEvents = newApplicationEvents(mockEvents)
- appEvents.sendNewApplicationEvent(appID0)
+ appEvents = NewApplicationEvents(mockEvents)
+ appEvents.SendNewApplicationEvent(appID)
event := mockEvents.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_ADD, event.EventChangeType)
@@ -262,13 +216,13 @@ func TestSendNewApplicationEvent(t *testing.T) {
func TestSendRemoveApplicationEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendRemoveApplicationEvent(appID0)
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendRemoveApplicationEvent(appID)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendRemoveApplicationEvent(appID0)
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendRemoveApplicationEvent(appID)
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
@@ -280,13 +234,13 @@ func TestSendRemoveApplicationEvent(t *testing.T) {
func TestSendStateChangeEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendStateChangeEvent(appID0, si.EventRecord_APP_RUNNING, "")
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendStateChangeEvent(appID, si.EventRecord_APP_RUNNING, "")
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendStateChangeEvent(appID0, si.EventRecord_APP_RUNNING, "The
application is running")
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendStateChangeEvent(appID, si.EventRecord_APP_RUNNING, "The
application is running")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
@@ -296,13 +250,13 @@ func TestSendStateChangeEvent(t *testing.T) {
assert.Equal(t, "The application is running", event.Message)
eventSystem = mock.NewEventSystemDisabled()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendStateChangeEvent(appID0, si.EventRecord_APP_RUNNING,
"ResourceReservationTimeout")
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendStateChangeEvent(appID, si.EventRecord_APP_RUNNING,
"ResourceReservationTimeout")
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendStateChangeEvent(appID0, si.EventRecord_APP_REJECT,
"Failed to add application to partition (placement rejected)")
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendStateChangeEvent(appID, si.EventRecord_APP_REJECT,
"Failed to add application to partition (placement rejected)")
event = eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_SET, event.EventChangeType)
@@ -314,13 +268,13 @@ func TestSendStateChangeEvent(t *testing.T) {
func TestSendAppCannotRunInQueueEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendAppNotRunnableInQueueEvent(appID0)
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendAppNotRunnableInQueueEvent(appID)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendAppNotRunnableInQueueEvent(appID0)
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendAppNotRunnableInQueueEvent(appID)
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
@@ -332,13 +286,13 @@ func TestSendAppCannotRunInQueueEvent(t *testing.T) {
func TestSendAppCannotRunByQuotaEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendAppNotRunnableQuotaEvent(appID0)
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendAppNotRunnableQuotaEvent(appID)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendAppNotRunnableQuotaEvent(appID0)
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendAppNotRunnableQuotaEvent(appID)
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
@@ -350,13 +304,13 @@ func TestSendAppCannotRunByQuotaEvent(t *testing.T) {
func TestSendAppRunnableInQueueEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendAppRunnableInQueueEvent(appID0)
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendAppRunnableInQueueEvent(appID)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendAppRunnableInQueueEvent(appID0)
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendAppRunnableInQueueEvent(appID)
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
@@ -368,13 +322,13 @@ func TestSendAppRunnableInQueueEvent(t *testing.T) {
func TestSendAppRunnableByQuotaEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- appEvents := newApplicationEvents(eventSystem)
- appEvents.sendAppRunnableQuotaEvent(appID0)
+ appEvents := NewApplicationEvents(eventSystem)
+ appEvents.SendAppRunnableQuotaEvent(appID)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- appEvents = newApplicationEvents(eventSystem)
- appEvents.sendAppRunnableQuotaEvent(appID0)
+ appEvents = NewApplicationEvents(eventSystem)
+ appEvents.SendAppRunnableQuotaEvent(appID)
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_APP, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
diff --git a/pkg/scheduler/objects/ask_events.go
b/pkg/scheduler/objects/events/ask_events.go
similarity index 83%
rename from pkg/scheduler/objects/ask_events.go
rename to pkg/scheduler/objects/events/ask_events.go
index 755374c6..a2063ecd 100644
--- a/pkg/scheduler/objects/ask_events.go
+++ b/pkg/scheduler/objects/events/ask_events.go
@@ -16,7 +16,7 @@
limitations under the License.
*/
-package objects
+package events
import (
"fmt"
@@ -28,13 +28,13 @@ import (
"github.com/apache/yunikorn-core/pkg/events"
)
-// Ask-specific events. These events are of REQUEST type, so they are
eventually sent to the respective pods in K8s.
-type askEvents struct {
+// AskEvents Ask-specific events. These events are of REQUEST type, so they
are eventually sent to the respective pods in K8s.
+type AskEvents struct {
eventSystem events.EventSystem
limiter *rate.Limiter
}
-func (ae *askEvents) sendRequestExceedsQueueHeadroom(allocKey, appID string,
headroom, allocatedResource *resources.Resource, queuePath string) {
+func (ae *AskEvents) SendRequestExceedsQueueHeadroom(allocKey, appID string,
headroom, allocatedResource *resources.Resource, queuePath string) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -43,7 +43,7 @@ func (ae *askEvents)
sendRequestExceedsQueueHeadroom(allocKey, appID string, hea
ae.eventSystem.AddEvent(event)
}
-func (ae *askEvents) sendRequestFitsInQueue(allocKey, appID, queuePath string,
allocatedResource *resources.Resource) {
+func (ae *AskEvents) SendRequestFitsInQueue(allocKey, appID, queuePath string,
allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -52,7 +52,7 @@ func (ae *askEvents) sendRequestFitsInQueue(allocKey, appID,
queuePath string, a
ae.eventSystem.AddEvent(event)
}
-func (ae *askEvents) sendRequestExceedsUserQuota(allocKey, appID string,
headroom, allocatedResource *resources.Resource) {
+func (ae *AskEvents) SendRequestExceedsUserQuota(allocKey, appID string,
headroom, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -61,7 +61,7 @@ func (ae *askEvents) sendRequestExceedsUserQuota(allocKey,
appID string, headroo
ae.eventSystem.AddEvent(event)
}
-func (ae *askEvents) sendRequestFitsInUserQuota(allocKey, appID string,
allocatedResource *resources.Resource) {
+func (ae *AskEvents) SendRequestFitsInUserQuota(allocKey, appID string,
allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -70,7 +70,7 @@ func (ae *askEvents) sendRequestFitsInUserQuota(allocKey,
appID string, allocate
ae.eventSystem.AddEvent(event)
}
-func (ae *askEvents) sendPredicateFailed(allocKey, appID, predicateMsg string,
allocatedResource *resources.Resource) {
+func (ae *AskEvents) SendPredicateFailed(allocKey, appID, predicateMsg string,
allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
return
}
@@ -79,12 +79,12 @@ func (ae *askEvents) sendPredicateFailed(allocKey, appID,
predicateMsg string, a
ae.eventSystem.AddEvent(event)
}
-func newAskEvents(evt events.EventSystem) *askEvents {
+func NewAskEvents(evt events.EventSystem) *AskEvents {
return newAskEventsWithRate(evt, 15*time.Second, 1)
}
-func newAskEventsWithRate(evt events.EventSystem, interval time.Duration,
burst int) *askEvents {
- return &askEvents{
+func newAskEventsWithRate(evt events.EventSystem, interval time.Duration,
burst int) *AskEvents {
+ return &AskEvents{
eventSystem: evt,
limiter: rate.NewLimiter(rate.Every(interval), burst),
}
diff --git a/pkg/scheduler/objects/ask_events_test.go
b/pkg/scheduler/objects/events/ask_events_test.go
similarity index 76%
rename from pkg/scheduler/objects/ask_events_test.go
rename to pkg/scheduler/objects/events/ask_events_test.go
index 40342588..a3a661ef 100644
--- a/pkg/scheduler/objects/ask_events_test.go
+++ b/pkg/scheduler/objects/events/ask_events_test.go
@@ -16,7 +16,7 @@
limitations under the License.
*/
-package objects
+package events
import (
"strconv"
@@ -30,8 +30,6 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-const allocKey = "alloc-0"
-
var requestResource =
resources.NewResourceFromMap(map[string]resources.Quantity{
"memory": 100,
"cpu": 100,
@@ -40,17 +38,17 @@ var requestResource =
resources.NewResourceFromMap(map[string]resources.Quantity
func TestRequestDoesNotFitInQueueEvent(t *testing.T) {
headroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(eventSystem)
- events.sendRequestExceedsQueueHeadroom(allocKey, appID1, headroom,
requestResource, "root.test")
+ events := NewAskEvents(eventSystem)
+ events.SendRequestExceedsQueueHeadroom(allocKey, appID, headroom,
requestResource, "root.test")
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEvents(eventSystem)
- events.sendRequestExceedsQueueHeadroom(allocKey, appID1, headroom,
requestResource, "root.test")
+ events = NewAskEvents(eventSystem)
+ events.SendRequestExceedsQueueHeadroom(allocKey, appID, headroom,
requestResource, "root.test")
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
- assert.Equal(t, appID1, event.ReferenceID)
+ assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -59,17 +57,17 @@ func TestRequestDoesNotFitInQueueEvent(t *testing.T) {
func TestRequestFitsInQueueEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(eventSystem)
- events.sendRequestFitsInQueue(allocKey, appID1, "root.test",
requestResource)
+ events := NewAskEvents(eventSystem)
+ events.SendRequestFitsInQueue(allocKey, appID, "root.test",
requestResource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEvents(eventSystem)
- events.sendRequestFitsInQueue(allocKey, appID1, "root.test",
requestResource)
+ events = NewAskEvents(eventSystem)
+ events.SendRequestFitsInQueue(allocKey, appID, "root.test",
requestResource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
- assert.Equal(t, appID1, event.ReferenceID)
+ assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -79,17 +77,17 @@ func TestRequestFitsInQueueEvent(t *testing.T) {
func TestRequestExceedsUserQuotaEvent(t *testing.T) {
headroom :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(eventSystem)
- events.sendRequestExceedsUserQuota(allocKey, appID1, headroom,
requestResource)
+ events := NewAskEvents(eventSystem)
+ events.SendRequestExceedsUserQuota(allocKey, appID, headroom,
requestResource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEvents(eventSystem)
- events.sendRequestExceedsUserQuota(allocKey, appID1, headroom,
requestResource)
+ events = NewAskEvents(eventSystem)
+ events.SendRequestExceedsUserQuota(allocKey, appID, headroom,
requestResource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
- assert.Equal(t, appID1, event.ReferenceID)
+ assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -98,17 +96,17 @@ func TestRequestExceedsUserQuotaEvent(t *testing.T) {
func TestRequestFitsInUserQuotaEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(eventSystem)
- events.sendRequestFitsInUserQuota(allocKey, appID1, requestResource)
+ events := NewAskEvents(eventSystem)
+ events.SendRequestFitsInUserQuota(allocKey, appID, requestResource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
- events = newAskEvents(eventSystem)
- events.sendRequestFitsInUserQuota(allocKey, appID1, requestResource)
+ events = NewAskEvents(eventSystem)
+ events.SendRequestFitsInUserQuota(allocKey, appID, requestResource)
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "alloc-0", event.ObjectID)
- assert.Equal(t, appID1, event.ReferenceID)
+ assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_NONE, event.EventChangeType)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
@@ -118,15 +116,15 @@ func TestRequestFitsInUserQuotaEvent(t *testing.T) {
func TestPredicateFailedEvents(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- events := newAskEvents(eventSystem)
- events.sendPredicateFailed("alloc-0", "app-0", "failed", resource)
+ events := NewAskEvents(eventSystem)
+ events.SendPredicateFailed("alloc-0", "app-0", "failed", resource)
assert.Equal(t, 0, len(eventSystem.Events))
eventSystem = mock.NewEventSystem()
events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
// only the first event is expected to be emitted due to rate limiting
for i := 0; i < 200; i++ {
- events.sendPredicateFailed("alloc-0", "app-0",
"failure-"+strconv.FormatUint(uint64(i), 10), resource)
+ events.SendPredicateFailed("alloc-0", "app-0",
"failure-"+strconv.FormatUint(uint64(i), 10), resource)
}
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
@@ -135,7 +133,7 @@ func TestPredicateFailedEvents(t *testing.T) {
eventSystem.Reset()
// wait a bit, a new event is expected
time.Sleep(100 * time.Millisecond)
- events.sendPredicateFailed("alloc-0", "app-0", "failed", resource)
+ events.SendPredicateFailed("alloc-0", "app-0", "failed", resource)
assert.Equal(t, 1, len(eventSystem.Events))
event = eventSystem.Events[0]
assert.Equal(t, "Predicate failed for request 'alloc-0' with message:
'failed'", event.Message)
diff --git a/pkg/scheduler/objects/node_events.go
b/pkg/scheduler/objects/events/node_events.go
similarity index 83%
rename from pkg/scheduler/objects/node_events.go
rename to pkg/scheduler/objects/events/node_events.go
index a01ceafd..09a5d9aa 100644
--- a/pkg/scheduler/objects/node_events.go
+++ b/pkg/scheduler/objects/events/node_events.go
@@ -16,7 +16,7 @@
limitations under the License.
*/
-package objects
+package events
import (
"github.com/apache/yunikorn-core/pkg/common"
@@ -25,11 +25,11 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-type nodeEvents struct {
+type NodeEvents struct {
eventSystem events.EventSystem
}
-func (n *nodeEvents) sendNodeAddedEvent(nodeID string, capacity
*resources.Resource) {
+func (n *NodeEvents) SendNodeAddedEvent(nodeID string, capacity
*resources.Resource) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -38,7 +38,7 @@ func (n *nodeEvents) sendNodeAddedEvent(nodeID string,
capacity *resources.Resou
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendNodeRemovedEvent(nodeID string) {
+func (n *NodeEvents) SendNodeRemovedEvent(nodeID string) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -47,7 +47,7 @@ func (n *nodeEvents) sendNodeRemovedEvent(nodeID string) {
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendAllocationAddedEvent(nodeID, allocKey string, res
*resources.Resource) {
+func (n *NodeEvents) SendAllocationAddedEvent(nodeID, allocKey string, res
*resources.Resource) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -56,7 +56,7 @@ func (n *nodeEvents) sendAllocationAddedEvent(nodeID,
allocKey string, res *reso
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendAllocationRemovedEvent(nodeID, allocKey string, res
*resources.Resource) {
+func (n *NodeEvents) SendAllocationRemovedEvent(nodeID, allocKey string, res
*resources.Resource) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -65,7 +65,7 @@ func (n *nodeEvents) sendAllocationRemovedEvent(nodeID,
allocKey string, res *re
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendNodeSchedulableChangedEvent(nodeID string, ready
bool) {
+func (n *NodeEvents) SendNodeSchedulableChangedEvent(nodeID string, ready
bool) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -80,7 +80,7 @@ func (n *nodeEvents) sendNodeSchedulableChangedEvent(nodeID
string, ready bool)
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendNodeCapacityChangedEvent(nodeID string, total
*resources.Resource) {
+func (n *NodeEvents) SendNodeCapacityChangedEvent(nodeID string, total
*resources.Resource) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -89,7 +89,7 @@ func (n *nodeEvents) sendNodeCapacityChangedEvent(nodeID
string, total *resource
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendNodeOccupiedResourceChangedEvent(nodeID string,
occupied *resources.Resource) {
+func (n *NodeEvents) SendNodeOccupiedResourceChangedEvent(nodeID string,
occupied *resources.Resource) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -98,7 +98,7 @@ func (n *nodeEvents)
sendNodeOccupiedResourceChangedEvent(nodeID string, occupie
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendReservedEvent(nodeID string, res *resources.Resource,
askID string) {
+func (n *NodeEvents) SendReservedEvent(nodeID string, res *resources.Resource,
askID string) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -107,7 +107,7 @@ func (n *nodeEvents) sendReservedEvent(nodeID string, res
*resources.Resource, a
n.eventSystem.AddEvent(event)
}
-func (n *nodeEvents) sendUnreservedEvent(nodeID string, res
*resources.Resource, askID string) {
+func (n *NodeEvents) SendUnreservedEvent(nodeID string, res
*resources.Resource, askID string) {
if !n.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -116,8 +116,8 @@ func (n *nodeEvents) sendUnreservedEvent(nodeID string, res
*resources.Resource,
n.eventSystem.AddEvent(event)
}
-func newNodeEvents(evt events.EventSystem) *nodeEvents {
- return &nodeEvents{
+func NewNodeEvents(evt events.EventSystem) *NodeEvents {
+ return &NodeEvents{
eventSystem: evt,
}
}
diff --git a/pkg/scheduler/objects/node_events_test.go
b/pkg/scheduler/objects/events/node_events_test.go
similarity index 83%
rename from pkg/scheduler/objects/node_events_test.go
rename to pkg/scheduler/objects/events/node_events_test.go
index bafc4f93..f9d79121 100644
--- a/pkg/scheduler/objects/node_events_test.go
+++ b/pkg/scheduler/objects/events/node_events_test.go
@@ -16,7 +16,7 @@
limitations under the License.
*/
-package objects
+package events
import (
"testing"
@@ -29,16 +29,18 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
+const nodeID1 = "node-1"
+
func TestSendNodeAddedEvent(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendNodeAddedEvent(nodeID1, resource)
+ ne := NewNodeEvents(eventSystem)
+ ne.SendNodeAddedEvent(nodeID1, resource)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendNodeAddedEvent(nodeID1, resource)
+ ne = NewNodeEvents(eventSystem)
+ ne.SendNodeAddedEvent(nodeID1, resource)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
@@ -53,13 +55,13 @@ func TestSendNodeAddedEvent(t *testing.T) {
func TestSendNodeRemovedEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendNodeRemovedEvent(nodeID1)
+ ne := NewNodeEvents(eventSystem)
+ ne.SendNodeRemovedEvent(nodeID1)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendNodeRemovedEvent(nodeID1)
+ ne = NewNodeEvents(eventSystem)
+ ne.SendNodeRemovedEvent(nodeID1)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
@@ -74,13 +76,13 @@ func TestSendAllocationAddedEvent(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendAllocationAddedEvent(nodeID1, "alloc-0", resource)
+ ne := NewNodeEvents(eventSystem)
+ ne.SendAllocationAddedEvent(nodeID1, "alloc-0", resource)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendAllocationAddedEvent(nodeID1, "alloc-0", resource)
+ ne = NewNodeEvents(eventSystem)
+ ne.SendAllocationAddedEvent(nodeID1, "alloc-0", resource)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
@@ -97,13 +99,13 @@ func TestSendAllocationRemovedEvent(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendAllocationRemovedEvent(nodeID1, "alloc-0", resource)
+ ne := NewNodeEvents(eventSystem)
+ ne.SendAllocationRemovedEvent(nodeID1, "alloc-0", resource)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendAllocationRemovedEvent(nodeID1, "alloc-0", resource)
+ ne = NewNodeEvents(eventSystem)
+ ne.SendAllocationRemovedEvent(nodeID1, "alloc-0", resource)
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
assert.Equal(t, "alloc-0", event.ReferenceID)
@@ -118,13 +120,13 @@ func TestSendAllocationRemovedEvent(t *testing.T) {
func TestSendOccupiedResourceChangedEvent(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendNodeOccupiedResourceChangedEvent(nodeID1, resource)
+ ne := NewNodeEvents(eventSystem)
+ ne.SendNodeOccupiedResourceChangedEvent(nodeID1, resource)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendNodeOccupiedResourceChangedEvent(nodeID1, resource)
+ ne = NewNodeEvents(eventSystem)
+ ne.SendNodeOccupiedResourceChangedEvent(nodeID1, resource)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
@@ -140,13 +142,13 @@ func TestSendOccupiedResourceChangedEvent(t *testing.T) {
func TestSendCapacityChangedEvent(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendNodeCapacityChangedEvent(nodeID1, resource)
+ ne := NewNodeEvents(eventSystem)
+ ne.SendNodeCapacityChangedEvent(nodeID1, resource)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendNodeCapacityChangedEvent(nodeID1, resource)
+ ne = NewNodeEvents(eventSystem)
+ ne.SendNodeCapacityChangedEvent(nodeID1, resource)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
@@ -161,13 +163,13 @@ func TestSendCapacityChangedEvent(t *testing.T) {
func TestNodeSchedulableChangedEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendNodeSchedulableChangedEvent(nodeID1, false)
+ ne := NewNodeEvents(eventSystem)
+ ne.SendNodeSchedulableChangedEvent(nodeID1, false)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendNodeSchedulableChangedEvent(nodeID1, false)
+ ne = NewNodeEvents(eventSystem)
+ ne.SendNodeSchedulableChangedEvent(nodeID1, false)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
@@ -178,7 +180,7 @@ func TestNodeSchedulableChangedEvent(t *testing.T) {
assert.Equal(t, 0, len(event.Resource.Resources))
eventSystem.Reset()
- ne.sendNodeSchedulableChangedEvent(nodeID1, true)
+ ne.SendNodeSchedulableChangedEvent(nodeID1, true)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event = eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
@@ -192,13 +194,13 @@ func TestNodeSchedulableChangedEvent(t *testing.T) {
func TestNodeReservationEvent(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendReservedEvent(nodeID1, resource, "alloc-0")
+ ne := NewNodeEvents(eventSystem)
+ ne.SendReservedEvent(nodeID1, resource, "alloc-0")
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendReservedEvent(nodeID1, resource, "alloc-0")
+ ne = NewNodeEvents(eventSystem)
+ ne.SendReservedEvent(nodeID1, resource, "alloc-0")
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
@@ -214,13 +216,13 @@ func TestNodeReservationEvent(t *testing.T) {
func TestNodeUnreservationEvent(t *testing.T) {
resource :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- ne := newNodeEvents(eventSystem)
- ne.sendUnreservedEvent(nodeID1, resource, "alloc-0")
+ ne := NewNodeEvents(eventSystem)
+ ne.SendUnreservedEvent(nodeID1, resource, "alloc-0")
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- ne = newNodeEvents(eventSystem)
- ne.sendUnreservedEvent(nodeID1, resource, "alloc-0")
+ ne = NewNodeEvents(eventSystem)
+ ne.SendUnreservedEvent(nodeID1, resource, "alloc-0")
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, nodeID1, event.ObjectID)
diff --git a/pkg/scheduler/objects/queue_events.go
b/pkg/scheduler/objects/events/queue_events.go
similarity index 84%
rename from pkg/scheduler/objects/queue_events.go
rename to pkg/scheduler/objects/events/queue_events.go
index 7ef24d3f..f33e0066 100644
--- a/pkg/scheduler/objects/queue_events.go
+++ b/pkg/scheduler/objects/events/queue_events.go
@@ -16,7 +16,7 @@
limitations under the License.
*/
-package objects
+package events
import (
"github.com/apache/yunikorn-core/pkg/common"
@@ -25,11 +25,11 @@ import (
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
-type queueEvents struct {
+type QueueEvents struct {
eventSystem events.EventSystem
}
-func (q *queueEvents) sendNewQueueEvent(queuePath string, managed bool) {
+func (q *QueueEvents) SendNewQueueEvent(queuePath string, managed bool) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -42,7 +42,7 @@ func (q *queueEvents) sendNewQueueEvent(queuePath string,
managed bool) {
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendNewApplicationEvent(queuePath, appID string) {
+func (q *QueueEvents) SendNewApplicationEvent(queuePath, appID string) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -51,7 +51,7 @@ func (q *queueEvents) sendNewApplicationEvent(queuePath,
appID string) {
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendRemoveQueueEvent(queuePath string, managed bool) {
+func (q *QueueEvents) SendRemoveQueueEvent(queuePath string, managed bool) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -64,7 +64,7 @@ func (q *queueEvents) sendRemoveQueueEvent(queuePath string,
managed bool) {
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendRemoveApplicationEvent(queuePath, appID string) {
+func (q *QueueEvents) SendRemoveApplicationEvent(queuePath, appID string) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -73,7 +73,7 @@ func (q *queueEvents) sendRemoveApplicationEvent(queuePath,
appID string) {
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendMaxResourceChangedEvent(queuePath string,
maxResource *resources.Resource) {
+func (q *QueueEvents) SendMaxResourceChangedEvent(queuePath string,
maxResource *resources.Resource) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -82,7 +82,7 @@ func (q *queueEvents) sendMaxResourceChangedEvent(queuePath
string, maxResource
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendGuaranteedResourceChangedEvent(queuePath string,
guaranteed *resources.Resource) {
+func (q *QueueEvents) SendGuaranteedResourceChangedEvent(queuePath string,
guaranteed *resources.Resource) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -91,7 +91,7 @@ func (q *queueEvents)
sendGuaranteedResourceChangedEvent(queuePath string, guara
q.eventSystem.AddEvent(event)
}
-func (q *queueEvents) sendTypeChangedEvent(queuePath string, isLeaf bool) {
+func (q *QueueEvents) SendTypeChangedEvent(queuePath string, isLeaf bool) {
if !q.eventSystem.IsEventTrackingEnabled() {
return
}
@@ -104,8 +104,8 @@ func (q *queueEvents) sendTypeChangedEvent(queuePath
string, isLeaf bool) {
q.eventSystem.AddEvent(event)
}
-func newQueueEvents(evt events.EventSystem) *queueEvents {
- return &queueEvents{
+func NewQueueEvents(evt events.EventSystem) *QueueEvents {
+ return &QueueEvents{
eventSystem: evt,
}
}
diff --git a/pkg/scheduler/objects/queue_events_test.go
b/pkg/scheduler/objects/events/queue_events_test.go
similarity index 80%
rename from pkg/scheduler/objects/queue_events_test.go
rename to pkg/scheduler/objects/events/queue_events_test.go
index 07d781fb..bae61d85 100644
--- a/pkg/scheduler/objects/queue_events_test.go
+++ b/pkg/scheduler/objects/events/queue_events_test.go
@@ -16,7 +16,7 @@
limitations under the License.
*/
-package objects
+package events
import (
"testing"
@@ -34,18 +34,14 @@ const (
)
func TestSendNewQueueEvent(t *testing.T) {
- queue := &Queue{
- QueuePath: testQueuePath,
- isManaged: true,
- }
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(eventSystem)
- nq.sendNewQueueEvent(queue.QueuePath, false)
+ nq := NewQueueEvents(eventSystem)
+ nq.SendNewQueueEvent(testQueuePath, false)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendNewQueueEvent(queue.QueuePath, true)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendNewQueueEvent(testQueuePath, true)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -56,21 +52,21 @@ func TestSendNewQueueEvent(t *testing.T) {
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, 0, len(event.Resource.Resources))
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendNewQueueEvent(queue.QueuePath, false)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendNewQueueEvent(testQueuePath, false)
event = eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail)
}
func TestSendRemoveQueueEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(eventSystem)
- nq.sendRemoveQueueEvent(testQueuePath, true)
+ nq := NewQueueEvents(eventSystem)
+ nq.SendRemoveQueueEvent(testQueuePath, true)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendRemoveQueueEvent(testQueuePath, true)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendRemoveQueueEvent(testQueuePath, true)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -81,26 +77,26 @@ func TestSendRemoveQueueEvent(t *testing.T) {
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, 0, len(event.Resource.Resources))
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendRemoveQueueEvent(testQueuePath, false)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendRemoveQueueEvent(testQueuePath, false)
event = eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE_DYNAMIC, event.EventChangeDetail)
}
func TestNewApplicationEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(eventSystem)
- nq.sendNewApplicationEvent(testQueuePath, appID0)
+ nq := NewQueueEvents(eventSystem)
+ nq.SendNewApplicationEvent(testQueuePath, appID)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendNewApplicationEvent(testQueuePath, appID0)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendNewApplicationEvent(testQueuePath, appID)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
assert.Equal(t, testQueuePath, event.ObjectID)
- assert.Equal(t, appID0, event.ReferenceID)
+ assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, common.Empty, event.Message)
assert.Equal(t, si.EventRecord_ADD, event.EventChangeType)
assert.Equal(t, si.EventRecord_QUEUE_APP, event.EventChangeDetail)
@@ -109,18 +105,18 @@ func TestNewApplicationEvent(t *testing.T) {
func TestRemoveApplicationEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(eventSystem)
- nq.sendRemoveApplicationEvent(testQueuePath, appID0)
+ nq := NewQueueEvents(eventSystem)
+ nq.SendRemoveApplicationEvent(testQueuePath, appID)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendRemoveApplicationEvent(testQueuePath, appID0)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendRemoveApplicationEvent(testQueuePath, appID)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
assert.Equal(t, testQueuePath, event.ObjectID)
- assert.Equal(t, appID0, event.ReferenceID)
+ assert.Equal(t, appID, event.ReferenceID)
assert.Equal(t, common.Empty, event.Message)
assert.Equal(t, si.EventRecord_REMOVE, event.EventChangeType)
assert.Equal(t, si.EventRecord_QUEUE_APP, event.EventChangeDetail)
@@ -129,13 +125,13 @@ func TestRemoveApplicationEvent(t *testing.T) {
func TestTypeChangedEvent(t *testing.T) {
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(eventSystem)
- nq.sendTypeChangedEvent(testQueuePath, true)
+ nq := NewQueueEvents(eventSystem)
+ nq.SendTypeChangedEvent(testQueuePath, true)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendTypeChangedEvent(testQueuePath, false)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendTypeChangedEvent(testQueuePath, false)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -150,13 +146,13 @@ func TestTypeChangedEvent(t *testing.T) {
func TestSendMaxResourceChangedEvent(t *testing.T) {
maxRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(eventSystem)
- nq.sendMaxResourceChangedEvent(testQueuePath, maxRes)
+ nq := NewQueueEvents(eventSystem)
+ nq.SendMaxResourceChangedEvent(testQueuePath, maxRes)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendMaxResourceChangedEvent(testQueuePath, maxRes)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendMaxResourceChangedEvent(testQueuePath, maxRes)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
@@ -173,13 +169,13 @@ func TestSendMaxResourceChangedEvent(t *testing.T) {
func TestSendGuaranteedResourceChangedEvent(t *testing.T) {
guaranteed :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
- nq := newQueueEvents(eventSystem)
- nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
+ nq := NewQueueEvents(eventSystem)
+ nq.SendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
assert.Equal(t, 0, len(eventSystem.Events), "unexpected event")
eventSystem = mock.NewEventSystem()
- nq = newQueueEvents(eventSystem)
- nq.sendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
+ nq = NewQueueEvents(eventSystem)
+ nq.SendGuaranteedResourceChangedEvent(testQueuePath, guaranteed)
assert.Equal(t, 1, len(eventSystem.Events), "event was not generated")
event := eventSystem.Events[0]
assert.Equal(t, si.EventRecord_QUEUE, event.Type)
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index 09efa2d3..26067b58 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/plugins"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -56,7 +57,7 @@ type Node struct {
reservations map[string]*reservation // a map of reservations
listeners []NodeListener // a list of node listeners
- nodeEvents *nodeEvents
+ nodeEvents *schedEvt.NodeEvents
locking.RWMutex
}
@@ -77,7 +78,7 @@ func NewNode(proto *si.NodeInfo) *Node {
schedulable: true,
listeners: make([]NodeListener, 0),
}
- sn.nodeEvents = newNodeEvents(events.GetEventSystem())
+ sn.nodeEvents = schedEvt.NewNodeEvents(events.GetEventSystem())
// initialise available resources
var err error
sn.availableResource, err =
resources.SubErrorNegative(sn.totalResource, sn.occupiedResource)
@@ -165,7 +166,7 @@ func (sn *Node) SetCapacity(newCapacity
*resources.Resource) *resources.Resource
delta := resources.Sub(newCapacity, sn.totalResource)
sn.totalResource = newCapacity
sn.refreshAvailableResource()
- sn.nodeEvents.sendNodeCapacityChangedEvent(sn.NodeID,
sn.totalResource.Clone())
+ sn.nodeEvents.SendNodeCapacityChangedEvent(sn.NodeID,
sn.totalResource.Clone())
return delta
}
@@ -184,7 +185,7 @@ func (sn *Node) SetOccupiedResource(occupiedResource
*resources.Resource) {
return
}
sn.occupiedResource = occupiedResource
- sn.nodeEvents.sendNodeOccupiedResourceChangedEvent(sn.NodeID,
sn.occupiedResource.Clone())
+ sn.nodeEvents.SendNodeOccupiedResourceChangedEvent(sn.NodeID,
sn.occupiedResource.Clone())
sn.refreshAvailableResource()
}
@@ -234,7 +235,7 @@ func (sn *Node) SetSchedulable(schedulable bool) {
sn.Lock()
defer sn.Unlock()
sn.schedulable = schedulable
- sn.nodeEvents.sendNodeSchedulableChangedEvent(sn.NodeID, sn.schedulable)
+ sn.nodeEvents.SendNodeSchedulableChangedEvent(sn.NodeID, sn.schedulable)
}
// Can this node be used in scheduling.
@@ -304,7 +305,7 @@ func (sn *Node) RemoveAllocation(allocationKey string)
*Allocation {
delete(sn.allocations, allocationKey)
sn.allocatedResource.SubFrom(alloc.GetAllocatedResource())
sn.availableResource.AddTo(alloc.GetAllocatedResource())
- sn.nodeEvents.sendAllocationRemovedEvent(sn.NodeID,
alloc.allocationKey, alloc.allocatedResource)
+ sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID,
alloc.allocationKey, alloc.allocatedResource)
return alloc
}
@@ -327,7 +328,7 @@ func (sn *Node) AddAllocation(alloc *Allocation) bool {
sn.allocations[alloc.GetAllocationKey()] = alloc
sn.allocatedResource.AddTo(res)
sn.availableResource.SubFrom(res)
- sn.nodeEvents.sendAllocationAddedEvent(sn.NodeID,
alloc.allocationKey, res)
+ sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID,
alloc.allocationKey, res)
return true
}
return false
@@ -490,7 +491,7 @@ func (sn *Node) Reserve(app *Application, ask
*AllocationAsk) error {
return fmt.Errorf("reservation does not fit on node %s, appID
%s, ask %s", sn.NodeID, app.ApplicationID, ask.GetAllocatedResource().String())
}
sn.reservations[appReservation.getKey()] = appReservation
- sn.nodeEvents.sendReservedEvent(sn.NodeID, ask.GetAllocatedResource(),
ask.GetAllocationKey())
+ sn.nodeEvents.SendReservedEvent(sn.NodeID, ask.GetAllocatedResource(),
ask.GetAllocationKey())
// reservation added successfully
return nil
}
@@ -512,7 +513,7 @@ func (sn *Node) unReserve(app *Application, ask
*AllocationAsk) (int, error) {
}
if _, ok := sn.reservations[resKey]; ok {
delete(sn.reservations, resKey)
- sn.nodeEvents.sendUnreservedEvent(sn.NodeID,
ask.GetAllocatedResource(), ask.GetAllocationKey())
+ sn.nodeEvents.SendUnreservedEvent(sn.NodeID,
ask.GetAllocatedResource(), ask.GetAllocationKey())
return 1, nil
}
// reservation was not found
@@ -589,9 +590,9 @@ func (sn *Node) getListeners() []NodeListener {
func (sn *Node) SendNodeAddedEvent() {
sn.RLock()
defer sn.RUnlock()
- sn.nodeEvents.sendNodeAddedEvent(sn.NodeID, sn.totalResource.Clone())
+ sn.nodeEvents.SendNodeAddedEvent(sn.NodeID, sn.totalResource.Clone())
}
func (sn *Node) SendNodeRemovedEvent() {
- sn.nodeEvents.sendNodeRemovedEvent(sn.NodeID)
+ sn.nodeEvents.SendNodeRemovedEvent(sn.NodeID)
}
diff --git a/pkg/scheduler/objects/node_test.go
b/pkg/scheduler/objects/node_test.go
index 2a3ba3d7..c3daf2a2 100644
--- a/pkg/scheduler/objects/node_test.go
+++ b/pkg/scheduler/objects/node_test.go
@@ -28,6 +28,7 @@ import (
evtMock "github.com/apache/yunikorn-core/pkg/events/mock"
"github.com/apache/yunikorn-core/pkg/mock"
"github.com/apache/yunikorn-core/pkg/plugins"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -664,7 +665,7 @@ func TestNodeEvents(t *testing.T) {
"ready": "true",
})
node := NewNode(proto)
- node.nodeEvents = newNodeEvents(mockEvents)
+ node.nodeEvents = schedEvt.NewNodeEvents(mockEvents)
node.SendNodeAddedEvent()
assert.Equal(t, 1, len(mockEvents.Events))
@@ -785,7 +786,7 @@ func TestPreconditions(t *testing.T) {
res :=
resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
ask := newAllocationAsk("test", "app001", res)
eventSystem := evtMock.NewEventSystem()
- ask.askEvents = newAskEvents(eventSystem)
+ ask.askEvents = schedEvt.NewAskEvents(eventSystem)
node := NewNode(proto)
// failure
diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go
index 9960d01a..f6b92c58 100644
--- a/pkg/scheduler/objects/queue.go
+++ b/pkg/scheduler/objects/queue.go
@@ -37,6 +37,7 @@ import (
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
@@ -86,7 +87,7 @@ type Queue struct {
runningApps uint64
allocatingAcceptedApps map[string]bool
template *template.Template
- queueEvents *queueEvents
+ queueEvents *schedEvt.QueueEvents
locking.RWMutex
}
@@ -141,10 +142,10 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent
*Queue) (*Queue, error)
} else {
sq.UpdateQueueProperties()
}
- sq.queueEvents = newQueueEvents(events.GetEventSystem())
+ sq.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem())
log.Log(log.SchedQueue).Info("configured queue added to scheduler",
zap.String("queueName", sq.QueuePath))
- sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged)
+ sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged)
return sq, nil
}
@@ -202,10 +203,10 @@ func newDynamicQueueInternal(name string, leaf bool,
parent *Queue) (*Queue, err
}
sq.UpdateQueueProperties()
- sq.queueEvents = newQueueEvents(events.GetEventSystem())
+ sq.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem())
log.Log(log.SchedQueue).Info("dynamic queue added to scheduler",
zap.String("queueName", sq.QueuePath))
- sq.queueEvents.sendNewQueueEvent(sq.QueuePath, sq.isManaged)
+ sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged)
return sq, nil
}
@@ -348,7 +349,7 @@ func (sq *Queue) applyConf(conf configs.QueueConfig) error {
}
if prevLeaf != sq.isLeaf && sq.queueEvents != nil {
- sq.queueEvents.sendTypeChangedEvent(sq.QueuePath, sq.isLeaf)
+ sq.queueEvents.SendTypeChangedEvent(sq.QueuePath, sq.isLeaf)
}
if !sq.isLeaf {
@@ -399,7 +400,7 @@ func (sq *Queue) setResources(guaranteedResource,
maxResource *resources.Resourc
zap.Stringer("current", sq.maxResource),
zap.Stringer("new", maxResource))
if !resources.Equals(sq.maxResource, maxResource) &&
sq.queueEvents != nil {
-
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource)
+
sq.queueEvents.SendMaxResourceChangedEvent(sq.QueuePath, maxResource)
}
sq.maxResource = maxResource
sq.updateMaxResourceMetrics()
@@ -409,7 +410,7 @@ func (sq *Queue) setResources(guaranteedResource,
maxResource *resources.Resourc
zap.Stringer("current", sq.maxResource),
zap.Stringer("new", maxResource))
if sq.queueEvents != nil {
-
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, maxResource)
+
sq.queueEvents.SendMaxResourceChangedEvent(sq.QueuePath, maxResource)
}
sq.maxResource = nil
sq.updateMaxResourceMetrics()
@@ -425,7 +426,7 @@ func (sq *Queue) setResources(guaranteedResource,
maxResource *resources.Resourc
zap.Stringer("current", sq.guaranteedResource),
zap.Stringer("new", guaranteedResource))
if !resources.Equals(sq.guaranteedResource, guaranteedResource)
&& sq.queueEvents != nil {
-
sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath,
guaranteedResource)
+
sq.queueEvents.SendGuaranteedResourceChangedEvent(sq.QueuePath,
guaranteedResource)
}
sq.guaranteedResource = guaranteedResource
sq.updateGuaranteedResourceMetrics()
@@ -435,7 +436,7 @@ func (sq *Queue) setResources(guaranteedResource,
maxResource *resources.Resourc
zap.Stringer("current", sq.guaranteedResource),
zap.Stringer("new", guaranteedResource))
if sq.queueEvents != nil {
-
sq.queueEvents.sendGuaranteedResourceChangedEvent(sq.QueuePath,
guaranteedResource)
+
sq.queueEvents.SendGuaranteedResourceChangedEvent(sq.QueuePath,
guaranteedResource)
}
sq.guaranteedResource = nil
sq.updateGuaranteedResourceMetrics()
@@ -748,7 +749,7 @@ func (sq *Queue) AddApplication(app *Application) {
defer sq.Unlock()
appID := app.ApplicationID
sq.applications[appID] = app
- sq.queueEvents.sendNewApplicationEvent(sq.QueuePath, appID)
+ sq.queueEvents.SendNewApplicationEvent(sq.QueuePath, appID)
}
// RemoveApplication removes the app from the list of tracked applications.
Make sure that the app
@@ -763,7 +764,7 @@ func (sq *Queue) RemoveApplication(app *Application) {
zap.String("applicationID", appID))
return
}
- sq.queueEvents.sendRemoveApplicationEvent(sq.QueuePath, appID)
+ sq.queueEvents.SendRemoveApplicationEvent(sq.QueuePath, appID)
if appPending := app.GetPendingResource();
!resources.IsZero(appPending) {
sq.decPendingResource(appPending)
}
@@ -800,7 +801,7 @@ func (sq *Queue) RemoveApplication(app *Application) {
delete(sq.allocatingAcceptedApps, appID)
priority := sq.recalculatePriority()
sq.Unlock()
- app.appEvents.sendRemoveApplicationEvent(appID)
+ app.appEvents.SendRemoveApplicationEvent(appID)
sq.parent.UpdateQueuePriority(sq.Name, priority)
@@ -965,7 +966,7 @@ func (sq *Queue) RemoveQueue() bool {
log.Log(log.SchedQueue).Info("removing queue", zap.String("queue",
sq.QueuePath))
// root is always managed and is the only queue with a nil parent: no
need to guard
sq.parent.removeChildQueue(sq.Name)
- sq.queueEvents.sendRemoveQueueEvent(sq.QueuePath, sq.isManaged)
+ sq.queueEvents.SendRemoveQueueEvent(sq.QueuePath, sq.isManaged)
return true
}
@@ -1276,7 +1277,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) {
zap.Stringer("current", sq.maxResource),
zap.Stringer("new", max))
if !resources.Equals(sq.maxResource, max) && sq.queueEvents !=
nil {
-
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
+
sq.queueEvents.SendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
}
sq.maxResource = max.Clone()
sq.updateMaxResourceMetrics()
@@ -1286,7 +1287,7 @@ func (sq *Queue) SetMaxResource(max *resources.Resource) {
zap.Stringer("current", sq.maxResource),
zap.Stringer("new", max))
if sq.queueEvents != nil {
-
sq.queueEvents.sendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
+
sq.queueEvents.SendMaxResourceChangedEvent(sq.QueuePath, sq.maxResource)
}
sq.maxResource = nil
sq.updateMaxResourceMetrics()
diff --git a/pkg/scheduler/objects/queue_test.go
b/pkg/scheduler/objects/queue_test.go
index 7cc13d64..18d60172 100644
--- a/pkg/scheduler/objects/queue_test.go
+++ b/pkg/scheduler/objects/queue_test.go
@@ -2478,7 +2478,10 @@ func TestQueueEvents(t *testing.T) {
assert.Equal(t, si.EventRecord_QUEUE, records[3].Type)
assert.Equal(t, si.EventRecord_REMOVE, records[3].EventChangeType)
assert.Equal(t, si.EventRecord_QUEUE_APP, records[3].EventChangeDetail)
- isRemoveApplicationEvent(t, app, records[4])
+ assert.Equal(t, si.EventRecord_APP, records[4].Type, "incorrect event
type, expect app")
+ assert.Equal(t, app.ApplicationID, records[4].ObjectID, "incorrect
object ID, expected application ID")
+ assert.Equal(t, si.EventRecord_REMOVE, records[4].EventChangeType,
"incorrect change type, expected remove")
+ assert.Equal(t, si.EventRecord_DETAILS_NONE,
records[4].EventChangeDetail, "incorrect change detail, expected none")
newConf := configs.QueueConfig{
Parent: false,
@@ -2546,3 +2549,10 @@ func TestQueueRunningAppsForSingleAllocationApp(t
*testing.T) {
assert.Equal(t, app.CurrentState(), Completing.String(), "app state
should be completing")
assert.Equal(t, leaf.runningApps, uint64(0), "leaf should have 0 app
running")
}
+
+func isNewApplicationEvent(t *testing.T, app *Application, record
*si.EventRecord) {
+ assert.Equal(t, si.EventRecord_APP, record.Type, "incorrect event type,
expect app")
+ assert.Equal(t, app.ApplicationID, record.ObjectID, "incorrect object
ID, expected application ID")
+ assert.Equal(t, si.EventRecord_ADD, record.EventChangeType, "incorrect
change type, expected add")
+ assert.Equal(t, si.EventRecord_APP_NEW, record.EventChangeDetail,
"incorrect change detail, expected none")
+}
diff --git a/pkg/scheduler/objects/utilities_test.go
b/pkg/scheduler/objects/utilities_test.go
index 11384b86..170620db 100644
--- a/pkg/scheduler/objects/utilities_test.go
+++ b/pkg/scheduler/objects/utilities_test.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/rmproxy"
+ schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
@@ -177,7 +178,7 @@ func newNodeInternal(nodeID string, total, occupied
*resources.Resource) *Node {
allocations: make(map[string]*Allocation),
schedulable: true,
reservations: make(map[string]*reservation),
- nodeEvents: newNodeEvents(events.GetEventSystem()),
+ nodeEvents:
schedEvt.NewNodeEvents(events.GetEventSystem()),
}
return sn
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]