This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 135bc788 [YUNIKORN-2978] Fix handling of reserved allocations where 
node differs (#996)
135bc788 is described below

commit 135bc7880e6bcc119aa48ad2969b484a3a8d26fa
Author: Craig Condit <[email protected]>
AuthorDate: Tue Nov 19 16:20:16 2024 -0600

    [YUNIKORN-2978] Fix handling of reserved allocations where node differs 
(#996)
    
    YUNIKORN-2700 introduced a bug where allocations of previously-reserved
    tasks were not handled correctly in the case where we schedule on a
    different node than the reservation. Ensure that we unreserve and
    allocate using the proper node in both cases.
    
    Also introduce additional logging of allocations on nodes to make
    finding issues like this easier in the future.
    
    Closes: #996
---
 pkg/scheduler/objects/node.go   | 22 ++++++++++++++++++
 pkg/scheduler/partition.go      | 51 +++++++++++++++++++++++++----------------
 pkg/scheduler/partition_test.go |  3 +++
 3 files changed, 56 insertions(+), 20 deletions(-)

diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index 8187f719..ae5971f4 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -344,6 +344,12 @@ func (sn *Node) RemoveAllocation(allocationKey string) 
*Allocation {
                }
                sn.availableResource.AddTo(alloc.GetAllocatedResource())
                sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, 
alloc.allocationKey, alloc.GetAllocatedResource())
+               log.Log(log.SchedNode).Info("node allocation removed",
+                       zap.String("appID", alloc.GetApplicationID()),
+                       zap.String("allocationKey", alloc.GetAllocationKey()),
+                       zap.Stringer("allocatedResource", 
alloc.GetAllocatedResource()),
+                       zap.Bool("placeholder", alloc.IsPlaceholder()),
+                       zap.String("targetNode", sn.NodeID))
                return alloc
        }
 
@@ -382,6 +388,10 @@ func (sn *Node) UpdateForeignAllocation(alloc *Allocation) 
*Allocation {
        delta := resources.Sub(newResource, existingResource)
        delta.Prune()
 
+       log.Log(log.SchedNode).Info("node foreign allocation updated",
+               zap.String("allocationKey", alloc.GetAllocationKey()),
+               zap.Stringer("deltaResource", delta),
+               zap.String("targetNode", sn.NodeID))
        sn.occupiedResource.AddTo(delta)
        sn.occupiedResource.Prune()
        sn.refreshAvailableResource()
@@ -416,6 +426,12 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, 
force bool) bool {
                sn.availableResource.SubFrom(res)
                sn.availableResource.Prune()
                sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, 
alloc.allocationKey, res)
+               log.Log(log.SchedNode).Info("node allocation processed",
+                       zap.String("appID", alloc.GetApplicationID()),
+                       zap.String("allocationKey", alloc.GetAllocationKey()),
+                       zap.Stringer("allocatedResource", 
alloc.GetAllocatedResource()),
+                       zap.Bool("placeholder", alloc.IsPlaceholder()),
+                       zap.String("targetNode", sn.NodeID))
                result = true
                return result
        }
@@ -440,6 +456,12 @@ func (sn *Node) ReplaceAllocation(allocationKey string, 
replace *Allocation, del
        sn.allocatedResource.AddTo(delta)
        sn.availableResource.SubFrom(delta)
        sn.availableResource.Prune()
+       log.Log(log.SchedNode).Info("node allocation replaced",
+               zap.String("appID", replace.GetApplicationID()),
+               zap.String("allocationKey", replace.GetAllocationKey()),
+               zap.Stringer("allocatedResource", 
replace.GetAllocatedResource()),
+               zap.String("placeholderKey", allocationKey),
+               zap.String("targetNode", sn.NodeID))
        if !before.FitIn(sn.allocatedResource) {
                log.Log(log.SchedNode).Warn("unexpected increase in node usage 
after placeholder replacement",
                        zap.String("placeholder allocationKey", allocationKey),
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 0ae63b68..fa8c9a4f 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -869,21 +869,12 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
        // find the node make sure it still exists
        // if the node was passed in use that ID instead of the one from the 
allocation
        // the node ID is set when a reservation is allocated on a non-reserved 
node
-       var nodeID string
        alloc := result.Request
-       if result.ReservedNodeID == "" {
-               nodeID = result.NodeID
-       } else {
-               nodeID = result.ReservedNodeID
-               log.Log(log.SchedPartition).Debug("Reservation allocated on 
different node",
-                       zap.String("current node", result.NodeID),
-                       zap.String("reserved node", nodeID),
-                       zap.String("appID", appID))
-       }
-       node := pc.GetNode(nodeID)
-       if node == nil {
-               log.Log(log.SchedPartition).Info("Node was removed while 
allocating",
-                       zap.String("nodeID", nodeID),
+       targetNodeID := result.NodeID
+       targetNode := pc.GetNode(targetNodeID)
+       if targetNode == nil {
+               log.Log(log.SchedPartition).Info("Target node was removed while 
allocating",
+                       zap.String("nodeID", targetNodeID),
                        zap.String("appID", appID))
 
                // attempt to deallocate
@@ -891,7 +882,7 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
                        allocKey := alloc.GetAllocationKey()
                        if _, err := app.DeallocateAsk(allocKey); err != nil {
                                log.Log(log.SchedPartition).Warn("Failed to 
unwind allocation",
-                                       zap.String("nodeID", nodeID),
+                                       zap.String("nodeID", targetNodeID),
                                        zap.String("appID", appID),
                                        zap.String("allocationKey", allocKey),
                                        zap.Error(err))
@@ -899,14 +890,34 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
                }
                return nil
        }
+
        // reservation
        if result.ResultType == objects.Reserved {
-               pc.reserve(app, node, result.Request)
+               pc.reserve(app, targetNode, result.Request)
                return nil
        }
+
        // unreserve
        if result.ResultType == objects.Unreserved || result.ResultType == 
objects.AllocatedReserved {
-               pc.unReserve(app, node, result.Request)
+               var reservedNodeID string
+               if result.ReservedNodeID == "" {
+                       reservedNodeID = result.NodeID
+               } else {
+                       reservedNodeID = result.ReservedNodeID
+                       log.Log(log.SchedPartition).Debug("Reservation 
allocated on different node",
+                               zap.String("current node", result.NodeID),
+                               zap.String("reserved node", reservedNodeID),
+                               zap.String("appID", appID))
+               }
+
+               reservedNode := pc.GetNode(reservedNodeID)
+               if reservedNode != nil {
+                       pc.unReserve(app, reservedNode, result.Request)
+               } else {
+                       log.Log(log.SchedPartition).Info("Reserved node was 
removed while allocating",
+                               zap.String("nodeID", reservedNodeID),
+                               zap.String("appID", appID))
+               }
                if result.ResultType == objects.Unreserved {
                        return nil
                }
@@ -915,8 +926,8 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
        }
 
        alloc.SetBindTime(time.Now())
-       alloc.SetNodeID(nodeID)
-       alloc.SetInstanceType(node.GetInstanceType())
+       alloc.SetNodeID(targetNodeID)
+       alloc.SetInstanceType(targetNode.GetInstanceType())
 
        // track the number of allocations
        pc.updateAllocationCount(1)
@@ -929,7 +940,7 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
                zap.String("allocationKey", result.Request.GetAllocationKey()),
                zap.Stringer("allocatedResource", 
result.Request.GetAllocatedResource()),
                zap.Bool("placeholder", result.Request.IsPlaceholder()),
-               zap.String("targetNode", alloc.GetNodeID()))
+               zap.String("targetNode", targetNodeID))
        // pass the allocation result back to the RM via the cluster context
        return result
 }
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 32e3f593..72e9da46 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2369,6 +2369,9 @@ func TestAllocReserveNewNode(t *testing.T) {
        assert.Equal(t, 0, len(node1.GetReservationKeys()), "old node should 
have no more reservations")
        assert.Equal(t, 0, len(app.GetReservations()), "ask should have been 
reserved")
        assertLimits(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
+       alloc2 := node2.GetAllocation("alloc-2")
+       assert.Assert(t, alloc2 != nil, "alloc was nil")
+       assert.Equal(t, nodeID2, alloc2.GetNodeID(), "wrong node id")
 }
 
 func TestTryAllocateReserve(t *testing.T) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to