This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new aab9eb77 [YUNIKORN-2593] Remove partition from Allocation and
AllocationAsk (#858)
aab9eb77 is described below
commit aab9eb772974b42679afdcf0749afc4528d56888
Author: Craig Condit <[email protected]>
AuthorDate: Wed May 8 11:29:34 2024 -0500
[YUNIKORN-2593] Remove partition from Allocation and AllocationAsk (#858)
The partition name is not required in Allocation and AllocationAsk, as it
can
be found in other contexts. Remove it to make the objects smaller.
Closes: #858
---
pkg/scheduler/context.go | 12 ++++++------
pkg/scheduler/objects/allocation.go | 8 --------
pkg/scheduler/objects/allocation_ask.go | 8 --------
pkg/scheduler/objects/application.go | 24 ++++++++++++------------
pkg/scheduler/objects/preemption.go | 2 +-
pkg/scheduler/partition.go | 2 +-
pkg/scheduler/partition_test.go | 6 +++---
pkg/webservice/dao/allocation_ask_info.go | 1 -
pkg/webservice/dao/allocation_info.go | 1 -
pkg/webservice/handlers.go | 2 --
10 files changed, 23 insertions(+), 43 deletions(-)
diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go
index 3d514e67..2ecda58e 100644
--- a/pkg/scheduler/context.go
+++ b/pkg/scheduler/context.go
@@ -143,7 +143,7 @@ func (cc *ClusterContext) schedule() bool {
metrics.GetSchedulerMetrics().ObserveSchedulingLatency(schedulingStart)
if alloc.GetResult() == objects.Replaced {
// communicate the removal to the RM
- cc.notifyRMAllocationReleased(psc.RmID,
alloc.GetReleasesClone(), si.TerminationType_PLACEHOLDER_REPLACED, "replacing
allocationKey: "+alloc.GetAllocationKey())
+ cc.notifyRMAllocationReleased(psc.RmID,
psc.Name, alloc.GetReleasesClone(), si.TerminationType_PLACEHOLDER_REPLACED,
"replacing allocationKey: "+alloc.GetAllocationKey())
} else {
cc.notifyRMNewAllocation(psc.RmID, alloc)
}
@@ -564,7 +564,7 @@ func (cc *ClusterContext)
handleRMUpdateApplicationEvent(event *rmevent.RMUpdate
}
allocations :=
partition.removeApplication(app.ApplicationID)
if len(allocations) > 0 {
- cc.notifyRMAllocationReleased(partition.RmID,
allocations, si.TerminationType_STOPPED_BY_RM,
+ cc.notifyRMAllocationReleased(partition.RmID,
partition.Name, allocations, si.TerminationType_STOPPED_BY_RM,
fmt.Sprintf("Application %s Removed",
app.ApplicationID))
}
log.Log(log.SchedContext).Info("Application removed
from partition",
@@ -690,7 +690,7 @@ func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo)
{
node.SendNodeRemovedEvent()
// notify the shim allocations have been released from node
if len(released) != 0 {
- cc.notifyRMAllocationReleased(partition.RmID, released,
si.TerminationType_STOPPED_BY_RM,
+ cc.notifyRMAllocationReleased(partition.RmID,
partition.Name, released, si.TerminationType_STOPPED_BY_RM,
fmt.Sprintf("Node %s Removed", node.NodeID))
}
for _, confirm := range confirmed {
@@ -845,7 +845,7 @@ func (cc *ClusterContext)
processAllocationReleases(releases []*si.AllocationRel
allocs, confirmed :=
partition.removeAllocation(toRelease)
// notify the RM of the exact released allocations
if len(allocs) > 0 {
- cc.notifyRMAllocationReleased(rmID, allocs,
si.TerminationType_STOPPED_BY_RM, "allocation remove as per RM request")
+ cc.notifyRMAllocationReleased(rmID,
partition.Name, allocs, si.TerminationType_STOPPED_BY_RM, "allocation remove as
per RM request")
}
// notify the RM of the confirmed allocations
(placeholder swap & preemption)
if confirmed != nil {
@@ -887,7 +887,7 @@ func (cc *ClusterContext) notifyRMNewAllocation(rmID
string, alloc *objects.Allo
// Create a RM update event to notify RM of released allocations
// Lock free call, all updates occur via events.
-func (cc *ClusterContext) notifyRMAllocationReleased(rmID string, released
[]*objects.Allocation, terminationType si.TerminationType, message string) {
+func (cc *ClusterContext) notifyRMAllocationReleased(rmID string,
partitionName string, released []*objects.Allocation, terminationType
si.TerminationType, message string) {
c := make(chan *rmevent.Result)
releaseEvent := &rmevent.RMReleaseAllocationEvent{
ReleasedAllocations: make([]*si.AllocationRelease, 0),
@@ -897,7 +897,7 @@ func (cc *ClusterContext) notifyRMAllocationReleased(rmID
string, released []*ob
for _, alloc := range released {
releaseEvent.ReleasedAllocations =
append(releaseEvent.ReleasedAllocations, &si.AllocationRelease{
ApplicationID: alloc.GetApplicationID(),
- PartitionName: alloc.GetPartitionName(),
+ PartitionName: partitionName,
TerminationType: terminationType,
Message: message,
AllocationKey: alloc.GetAllocationKey(),
diff --git a/pkg/scheduler/objects/allocation.go
b/pkg/scheduler/objects/allocation.go
index dd4c1f99..fff75116 100644
--- a/pkg/scheduler/objects/allocation.go
+++ b/pkg/scheduler/objects/allocation.go
@@ -25,7 +25,6 @@ import (
"go.uber.org/zap"
- "github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/locking"
"github.com/apache/yunikorn-core/pkg/log"
@@ -90,7 +89,6 @@ func NewAllocation(nodeID string, ask *AllocationAsk)
*Allocation {
createTime: createTime,
bindTime: time.Now(),
nodeID: nodeID,
- partitionName:
common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
tags: ask.GetTagsClone(),
priority: ask.GetPriority(),
allocatedResource: ask.GetAllocatedResource().Clone(),
@@ -139,7 +137,6 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation {
ask := &AllocationAsk{
allocationKey: alloc.AllocationKey,
applicationID: alloc.ApplicationID,
- partitionName: alloc.PartitionName,
allocatedResource:
resources.NewResourceFromProto(alloc.ResourcePerAlloc),
tags: CloneAllocationTags(alloc.AllocationTags),
priority: alloc.Priority,
@@ -204,11 +201,6 @@ func (a *Allocation) GetApplicationID() string {
return a.applicationID
}
-// GetPartitionName returns the partition name for this allocation
-func (a *Allocation) GetPartitionName() string {
- return a.partitionName
-}
-
// GetTaskGroup returns the task group name for this allocation
func (a *Allocation) GetTaskGroup() string {
return a.taskGroupName
diff --git a/pkg/scheduler/objects/allocation_ask.go
b/pkg/scheduler/objects/allocation_ask.go
index fc39a770..4701beae 100644
--- a/pkg/scheduler/objects/allocation_ask.go
+++ b/pkg/scheduler/objects/allocation_ask.go
@@ -36,7 +36,6 @@ type AllocationAsk struct {
// Read-only fields
allocationKey string
applicationID string
- partitionName string
taskGroupName string // task group this allocation ask belongs to
placeholder bool // is this a placeholder allocation ask
createTime time.Time // the time this ask was created (used in
reservations)
@@ -89,8 +88,6 @@ func NewAllocationAskFromSI(ask *si.AllocationAsk)
*AllocationAsk {
allocationKey: ask.AllocationKey,
allocatedResource:
resources.NewResourceFromProto(ask.ResourceAsk),
applicationID: ask.ApplicationID,
- partitionName: ask.PartitionName,
-
tags: CloneAllocationTags(ask.Tags),
createTime: time.Now(),
priority: ask.Priority,
@@ -132,11 +129,6 @@ func (aa *AllocationAsk) GetApplicationID() string {
return aa.applicationID
}
-// GetPartitionName returns the partition name for this ask
-func (aa *AllocationAsk) GetPartitionName() string {
- return aa.partitionName
-}
-
// allocate marks the ask as allocated and returns true if successful. An ask
may not be allocated multiple times.
func (aa *AllocationAsk) allocate() bool {
aa.Lock()
diff --git a/pkg/scheduler/objects/application.go
b/pkg/scheduler/objects/application.go
index 8b280891..5ef35dea 100644
--- a/pkg/scheduler/objects/application.go
+++ b/pkg/scheduler/objects/application.go
@@ -356,7 +356,7 @@ func (sa *Application) timeoutStateTimer(expectedState
string, event application
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
}
- sa.notifyRMAllocationReleased(sa.rmID,
toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
+ sa.notifyRMAllocationReleased(toRelease,
si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
sa.clearStateTimer()
} else {
// nolint: errcheck
@@ -430,7 +430,7 @@ func (sa *Application) timeoutPlaceholderProcessing() {
zap.String("AppID", sa.ApplicationID),
zap.Int("placeholders being replaced", replacing),
zap.Int("releasing placeholders", len(toRelease)))
- sa.notifyRMAllocationReleased(sa.rmID, toRelease,
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder
timeout")
+ sa.notifyRMAllocationReleased(toRelease,
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder
timeout")
// Case 2: in every other case fail the application, and notify the
context about the expired placeholder asks
default:
log.Log(log.SchedApplication).Info("Placeholder timeout,
releasing asks and placeholders",
@@ -449,10 +449,10 @@ func (sa *Application) timeoutPlaceholderProcessing() {
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
- sa.notifyRMAllocationAskReleased(sa.rmID,
sa.getAllRequestsInternal(), si.TerminationType_TIMEOUT, "releasing
placeholders asks on placeholder timeout")
+ sa.notifyRMAllocationAskReleased(sa.getAllRequestsInternal(),
si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder
timeout")
sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
// all allocations are placeholders but GetAllAllocations is
locked and cannot be used
- sa.notifyRMAllocationReleased(sa.rmID,
sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing
allocated placeholders on placeholder timeout")
+ sa.notifyRMAllocationReleased(sa.getPlaceholderAllocations(),
si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder
timeout")
// we are in an accepted or new state so nothing can be
replaced yet: mark everything as timedout
for _, phData := range sa.placeholderData {
phData.TimedOut = phData.Count
@@ -1144,7 +1144,7 @@ func (sa *Application)
tryPlaceholderAllocate(nodeIterator func() NodeIterator,
zap.Stringer("placeholder resource",
ph.GetAllocatedResource()))
// release the placeholder and tell the RM
ph.SetReleased(true)
- sa.notifyRMAllocationReleased(sa.rmID,
[]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource
incompatible")
+
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT,
"cancel placeholder: resource incompatible")
sa.appEvents.sendPlaceholderLargerEvent(ph,
request)
continue
}
@@ -1347,7 +1347,7 @@ func (sa *Application) tryRequiredNodePreemption(reserve
*reservation, ask *Allo
victim.MarkPreempted()
}
ask.MarkTriggeredPreemption()
- sa.notifyRMAllocationReleased(sa.rmID, victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+ sa.notifyRMAllocationReleased(victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
"preempting allocations to free up resources to run
daemon set ask: "+ask.GetAllocationKey())
return true
}
@@ -1943,7 +1943,7 @@ func (sa *Application) executeTerminatedCallback() {
// notifyRMAllocationReleased send an allocation release event to the RM to if
the event handler is configured
// and at least one allocation has been released.
// No locking must be called while holding the lock
-func (sa *Application) notifyRMAllocationReleased(rmID string, released
[]*Allocation, terminationType si.TerminationType, message string) {
+func (sa *Application) notifyRMAllocationReleased(released []*Allocation,
terminationType si.TerminationType, message string) {
// only generate event if needed
if len(released) == 0 || sa.rmEventHandler == nil {
return
@@ -1951,13 +1951,13 @@ func (sa *Application) notifyRMAllocationReleased(rmID
string, released []*Alloc
c := make(chan *rmevent.Result)
releaseEvent := &rmevent.RMReleaseAllocationEvent{
ReleasedAllocations: make([]*si.AllocationRelease, 0),
- RmID: rmID,
+ RmID: sa.rmID,
Channel: c,
}
for _, alloc := range released {
releaseEvent.ReleasedAllocations =
append(releaseEvent.ReleasedAllocations, &si.AllocationRelease{
ApplicationID: alloc.GetApplicationID(),
- PartitionName: alloc.GetPartitionName(),
+ PartitionName: sa.Partition,
AllocationKey: alloc.GetAllocationKey(),
TerminationType: terminationType,
Message: message,
@@ -1976,19 +1976,19 @@ func (sa *Application) notifyRMAllocationReleased(rmID
string, released []*Alloc
// notifyRMAllocationAskReleased send an ask release event to the RM to if the
event handler is configured
// and at least one ask has been released.
// No locking must be called while holding the lock
-func (sa *Application) notifyRMAllocationAskReleased(rmID string, released
[]*AllocationAsk, terminationType si.TerminationType, message string) {
+func (sa *Application) notifyRMAllocationAskReleased(released
[]*AllocationAsk, terminationType si.TerminationType, message string) {
// only generate event if needed
if len(released) == 0 || sa.rmEventHandler == nil {
return
}
releaseEvent := &rmevent.RMReleaseAllocationAskEvent{
ReleasedAllocationAsks: make([]*si.AllocationAskRelease, 0),
- RmID: rmID,
+ RmID: sa.rmID,
}
for _, alloc := range released {
releaseEvent.ReleasedAllocationAsks =
append(releaseEvent.ReleasedAllocationAsks, &si.AllocationAskRelease{
ApplicationID: alloc.GetApplicationID(),
- PartitionName: alloc.GetPartitionName(),
+ PartitionName:
common.GetPartitionNameWithoutClusterID(sa.Partition),
AllocationKey: alloc.GetAllocationKey(),
TerminationType: terminationType,
Message: message,
diff --git a/pkg/scheduler/objects/preemption.go
b/pkg/scheduler/objects/preemption.go
index efac8fa1..e19929ed 100644
--- a/pkg/scheduler/objects/preemption.go
+++ b/pkg/scheduler/objects/preemption.go
@@ -569,7 +569,7 @@ func (p *Preemptor) TryPreemption() (*Allocation, bool) {
p.ask.MarkTriggeredPreemption()
// notify RM that victims should be released
- p.application.notifyRMAllocationReleased(p.application.rmID, victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
+ p.application.notifyRMAllocationReleased(victims,
si.TerminationType_PREEMPTED_BY_SCHEDULER,
"preempting allocations to free up resources to run ask:
"+p.ask.GetAllocationKey())
// reserve the selected node for the new allocation if it will fit
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index b36bfd5c..f2154136 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -46,7 +46,7 @@ import (
type PartitionContext struct {
RmID string // the RM the partition belongs to
- Name string // name of the partition (logging mainly)
+ Name string // name of the partition
// Private fields need protection
root *objects.Queue // start of the
queue hierarchy
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index dffae4e6..eabb1ad4 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2948,7 +2948,7 @@ func TestPlaceholderSmallerThanReal(t *testing.T) {
// release placeholder: do what the context would do after the shim
processing
release := &si.AllocationRelease{
- PartitionName: ph.GetPartitionName(),
+ PartitionName: partition.Name,
ApplicationID: appID1,
AllocationKey: ph.GetAllocationKey(),
TerminationType: si.TerminationType_TIMEOUT,
@@ -3020,7 +3020,7 @@ func TestPlaceholderSmallerMulti(t *testing.T) {
for id, ph := range phs {
assert.Assert(t, ph.IsReleased(), "placeholder %s should be
released", id)
release := &si.AllocationRelease{
- PartitionName: ph.GetPartitionName(),
+ PartitionName: partition.Name,
ApplicationID: appID1,
AllocationKey: ph.GetAllocationKey(),
TerminationType: si.TerminationType_TIMEOUT,
@@ -3087,7 +3087,7 @@ func TestPlaceholderBiggerThanReal(t *testing.T) {
// replace the placeholder: do what the context would do after the shim
processing
release := &si.AllocationRelease{
- PartitionName: ph.GetPartitionName(),
+ PartitionName: partition.Name,
ApplicationID: appID1,
AllocationKey: ph.GetAllocationKey(),
TerminationType: si.TerminationType_PLACEHOLDER_REPLACED,
diff --git a/pkg/webservice/dao/allocation_ask_info.go
b/pkg/webservice/dao/allocation_ask_info.go
index 5f865b6d..8516047e 100644
--- a/pkg/webservice/dao/allocation_ask_info.go
+++ b/pkg/webservice/dao/allocation_ask_info.go
@@ -32,7 +32,6 @@ type AllocationAskDAOInfo struct {
Priority string
`json:"priority,omitempty"`
RequiredNodeID string
`json:"requiredNodeId,omitempty"`
ApplicationID string
`json:"applicationId,omitempty"`
- Partition string
`json:"partition,omitempty"`
Placeholder bool
`json:"placeholder,omitempty"`
TaskGroupName string
`json:"taskGroupName,omitempty"`
AllocationLog []*AllocationAskLogDAOInfo
`json:"allocationLog,omitempty"`
diff --git a/pkg/webservice/dao/allocation_info.go
b/pkg/webservice/dao/allocation_info.go
index 8c9ea2ff..aec6b994 100644
--- a/pkg/webservice/dao/allocation_info.go
+++ b/pkg/webservice/dao/allocation_info.go
@@ -28,7 +28,6 @@ type AllocationDAOInfo struct {
Priority string `json:"priority,omitempty"`
NodeID string `json:"nodeId,omitempty"`
ApplicationID string `json:"applicationId,omitempty"`
- Partition string `json:"partition,omitempty"`
Placeholder bool `json:"placeholder,omitempty"`
PlaceholderUsed bool `json:"placeholderUsed,omitempty"`
TaskGroupName string `json:"taskGroupName,omitempty"`
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 04c70c50..5ccc31a5 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -226,7 +226,6 @@ func getAllocationDAO(alloc *objects.Allocation)
*dao.AllocationDAOInfo {
Priority: strconv.Itoa(int(alloc.GetPriority())),
NodeID: alloc.GetNodeID(),
ApplicationID: alloc.GetApplicationID(),
- Partition: alloc.GetPartitionName(),
Preempted: alloc.IsPreempted(),
}
return allocDAO
@@ -327,7 +326,6 @@ func getAllocationAskDAO(ask *objects.AllocationAsk)
*dao.AllocationAskDAOInfo {
Priority: strconv.Itoa(int(ask.GetPriority())),
RequiredNodeID: ask.GetRequiredNode(),
ApplicationID: ask.GetApplicationID(),
- Partition:
common.GetPartitionNameWithoutClusterID(ask.GetPartitionName()),
Placeholder: ask.IsPlaceholder(),
TaskGroupName: ask.GetTaskGroup(),
AllocationLog:
getAllocationLogsDAO(ask.GetAllocationLog()),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]