This is an automated email from the ASF dual-hosted git repository.
ccondit pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new 9b1745ab [YUNIKORN-2978] Fix handling of reserved allocations where
node differs (#996)
9b1745ab is described below
commit 9b1745ab499099832a81fc43cc695acc17a0e757
Author: Craig Condit <[email protected]>
AuthorDate: Tue Nov 19 16:20:16 2024 -0600
[YUNIKORN-2978] Fix handling of reserved allocations where node differs
(#996)
YUNIKORN-2700 introduced a bug where allocations of previously-reserved
tasks were not handled correctly in the case where we schedule on a
different node than the reservation. Ensure that we unreserve and
allocate using the proper node in both cases.
Also introduce additional logging of allocations on nodes to make
finding issues like this easier in the future.
Closes: #996
---
pkg/scheduler/objects/node.go | 18 +++++++++++++++
pkg/scheduler/partition.go | 51 +++++++++++++++++++++++++----------------
pkg/scheduler/partition_test.go | 3 +++
3 files changed, 52 insertions(+), 20 deletions(-)
diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index d4506320..f8fe64c8 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -323,6 +323,12 @@ func (sn *Node) RemoveAllocation(allocationKey string)
*Allocation {
sn.allocatedResource.Prune()
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID,
alloc.allocationKey, alloc.GetAllocatedResource())
+ log.Log(log.SchedNode).Info("node allocation removed",
+ zap.String("appID", alloc.GetApplicationID()),
+ zap.String("allocationKey", alloc.GetAllocationKey()),
+ zap.Stringer("allocatedResource",
alloc.GetAllocatedResource()),
+ zap.Bool("placeholder", alloc.IsPlaceholder()),
+ zap.String("targetNode", sn.NodeID))
return alloc
}
@@ -365,6 +371,12 @@ func (sn *Node) addAllocationInternal(alloc *Allocation,
force bool) bool {
sn.availableResource.SubFrom(res)
sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID,
alloc.allocationKey, res)
+ log.Log(log.SchedNode).Info("node allocation processed",
+ zap.String("appID", alloc.GetApplicationID()),
+ zap.String("allocationKey", alloc.GetAllocationKey()),
+ zap.Stringer("allocatedResource",
alloc.GetAllocatedResource()),
+ zap.Bool("placeholder", alloc.IsPlaceholder()),
+ zap.String("targetNode", sn.NodeID))
result = true
return result
}
@@ -389,6 +401,12 @@ func (sn *Node) ReplaceAllocation(allocationKey string,
replace *Allocation, del
sn.allocatedResource.AddTo(delta)
sn.availableResource.SubFrom(delta)
sn.availableResource.Prune()
+ log.Log(log.SchedNode).Info("node allocation replaced",
+ zap.String("appID", replace.GetApplicationID()),
+ zap.String("allocationKey", replace.GetAllocationKey()),
+ zap.Stringer("allocatedResource",
replace.GetAllocatedResource()),
+ zap.String("placeholderKey", allocationKey),
+ zap.String("targetNode", sn.NodeID))
if !before.FitIn(sn.allocatedResource) {
log.Log(log.SchedNode).Warn("unexpected increase in node usage
after placeholder replacement",
zap.String("placeholder allocationKey", allocationKey),
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 1acd5af2..c52c8734 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -867,21 +867,12 @@ func (pc *PartitionContext) allocate(result
*objects.AllocationResult) *objects.
// find the node make sure it still exists
// if the node was passed in use that ID instead of the one from the
allocation
// the node ID is set when a reservation is allocated on a non-reserved
node
- var nodeID string
alloc := result.Request
- if result.ReservedNodeID == "" {
- nodeID = result.NodeID
- } else {
- nodeID = result.ReservedNodeID
- log.Log(log.SchedPartition).Debug("Reservation allocated on
different node",
- zap.String("current node", result.NodeID),
- zap.String("reserved node", nodeID),
- zap.String("appID", appID))
- }
- node := pc.GetNode(nodeID)
- if node == nil {
- log.Log(log.SchedPartition).Info("Node was removed while
allocating",
- zap.String("nodeID", nodeID),
+ targetNodeID := result.NodeID
+ targetNode := pc.GetNode(targetNodeID)
+ if targetNode == nil {
+ log.Log(log.SchedPartition).Info("Target node was removed while
allocating",
+ zap.String("nodeID", targetNodeID),
zap.String("appID", appID))
// attempt to deallocate
@@ -889,7 +880,7 @@ func (pc *PartitionContext) allocate(result
*objects.AllocationResult) *objects.
allocKey := alloc.GetAllocationKey()
if _, err := app.DeallocateAsk(allocKey); err != nil {
log.Log(log.SchedPartition).Warn("Failed to
unwind allocation",
- zap.String("nodeID", nodeID),
+ zap.String("nodeID", targetNodeID),
zap.String("appID", appID),
zap.String("allocationKey", allocKey),
zap.Error(err))
@@ -897,14 +888,34 @@ func (pc *PartitionContext) allocate(result
*objects.AllocationResult) *objects.
}
return nil
}
+
// reservation
if result.ResultType == objects.Reserved {
- pc.reserve(app, node, result.Request)
+ pc.reserve(app, targetNode, result.Request)
return nil
}
+
// unreserve
if result.ResultType == objects.Unreserved || result.ResultType ==
objects.AllocatedReserved {
- pc.unReserve(app, node, result.Request)
+ var reservedNodeID string
+ if result.ReservedNodeID == "" {
+ reservedNodeID = result.NodeID
+ } else {
+ reservedNodeID = result.ReservedNodeID
+ log.Log(log.SchedPartition).Debug("Reservation
allocated on different node",
+ zap.String("current node", result.NodeID),
+ zap.String("reserved node", reservedNodeID),
+ zap.String("appID", appID))
+ }
+
+ reservedNode := pc.GetNode(reservedNodeID)
+ if reservedNode != nil {
+ pc.unReserve(app, reservedNode, result.Request)
+ } else {
+ log.Log(log.SchedPartition).Info("Reserved node was
removed while allocating",
+ zap.String("nodeID", reservedNodeID),
+ zap.String("appID", appID))
+ }
if result.ResultType == objects.Unreserved {
return nil
}
@@ -913,8 +924,8 @@ func (pc *PartitionContext) allocate(result
*objects.AllocationResult) *objects.
}
alloc.SetBindTime(time.Now())
- alloc.SetNodeID(nodeID)
- alloc.SetInstanceType(node.GetInstanceType())
+ alloc.SetNodeID(targetNodeID)
+ alloc.SetInstanceType(targetNode.GetInstanceType())
// track the number of allocations
pc.updateAllocationCount(1)
@@ -927,7 +938,7 @@ func (pc *PartitionContext) allocate(result
*objects.AllocationResult) *objects.
zap.String("allocationKey", result.Request.GetAllocationKey()),
zap.Stringer("allocatedResource",
result.Request.GetAllocatedResource()),
zap.Bool("placeholder", result.Request.IsPlaceholder()),
- zap.String("targetNode", alloc.GetNodeID()))
+ zap.String("targetNode", targetNodeID))
// pass the allocation result back to the RM via the cluster context
return result
}
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 79c4cdd5..e436ea71 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2327,6 +2327,9 @@ func TestAllocReserveNewNode(t *testing.T) {
assert.Equal(t, 0, len(node1.GetReservationKeys()), "old node should
have no more reservations")
assert.Equal(t, 0, len(app.GetReservations()), "ask should have been
reserved")
assertLimits(t, getTestUserGroup(),
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
+ alloc2 := node2.GetAllocation("alloc-2")
+ assert.Assert(t, alloc2 != nil, "alloc was nil")
+ assert.Equal(t, nodeID2, alloc2.GetNodeID(), "wrong node id")
}
func TestTryAllocateReserve(t *testing.T) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]