This is an automated email from the ASF dual-hosted git repository.

ccondit pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 9b1745ab [YUNIKORN-2978] Fix handling of reserved allocations where 
node differs (#996)
9b1745ab is described below

commit 9b1745ab499099832a81fc43cc695acc17a0e757
Author: Craig Condit <[email protected]>
AuthorDate: Tue Nov 19 16:20:16 2024 -0600

    [YUNIKORN-2978] Fix handling of reserved allocations where node differs 
(#996)
    
    YUNIKORN-2700 introduced a bug where allocations of previously-reserved
    tasks were not handled correctly in the case where we schedule on a
    different node than the reservation. Ensure that we unreserve and
    allocate using the proper node in both cases.
    
    Also introduce additional logging of allocations on nodes to make
    finding issues like this easier in the future.
    
    Closes: #996
---
 pkg/scheduler/objects/node.go   | 18 +++++++++++++++
 pkg/scheduler/partition.go      | 51 +++++++++++++++++++++++++----------------
 pkg/scheduler/partition_test.go |  3 +++
 3 files changed, 52 insertions(+), 20 deletions(-)

diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go
index d4506320..f8fe64c8 100644
--- a/pkg/scheduler/objects/node.go
+++ b/pkg/scheduler/objects/node.go
@@ -323,6 +323,12 @@ func (sn *Node) RemoveAllocation(allocationKey string) 
*Allocation {
                sn.allocatedResource.Prune()
                sn.availableResource.AddTo(alloc.GetAllocatedResource())
                sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, 
alloc.allocationKey, alloc.GetAllocatedResource())
+               log.Log(log.SchedNode).Info("node allocation removed",
+                       zap.String("appID", alloc.GetApplicationID()),
+                       zap.String("allocationKey", alloc.GetAllocationKey()),
+                       zap.Stringer("allocatedResource", 
alloc.GetAllocatedResource()),
+                       zap.Bool("placeholder", alloc.IsPlaceholder()),
+                       zap.String("targetNode", sn.NodeID))
                return alloc
        }
 
@@ -365,6 +371,12 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, 
force bool) bool {
                sn.availableResource.SubFrom(res)
                sn.availableResource.Prune()
                sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, 
alloc.allocationKey, res)
+               log.Log(log.SchedNode).Info("node allocation processed",
+                       zap.String("appID", alloc.GetApplicationID()),
+                       zap.String("allocationKey", alloc.GetAllocationKey()),
+                       zap.Stringer("allocatedResource", 
alloc.GetAllocatedResource()),
+                       zap.Bool("placeholder", alloc.IsPlaceholder()),
+                       zap.String("targetNode", sn.NodeID))
                result = true
                return result
        }
@@ -389,6 +401,12 @@ func (sn *Node) ReplaceAllocation(allocationKey string, 
replace *Allocation, del
        sn.allocatedResource.AddTo(delta)
        sn.availableResource.SubFrom(delta)
        sn.availableResource.Prune()
+       log.Log(log.SchedNode).Info("node allocation replaced",
+               zap.String("appID", replace.GetApplicationID()),
+               zap.String("allocationKey", replace.GetAllocationKey()),
+               zap.Stringer("allocatedResource", 
replace.GetAllocatedResource()),
+               zap.String("placeholderKey", allocationKey),
+               zap.String("targetNode", sn.NodeID))
        if !before.FitIn(sn.allocatedResource) {
                log.Log(log.SchedNode).Warn("unexpected increase in node usage 
after placeholder replacement",
                        zap.String("placeholder allocationKey", allocationKey),
diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go
index 1acd5af2..c52c8734 100644
--- a/pkg/scheduler/partition.go
+++ b/pkg/scheduler/partition.go
@@ -867,21 +867,12 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
        // find the node make sure it still exists
        // if the node was passed in use that ID instead of the one from the 
allocation
        // the node ID is set when a reservation is allocated on a non-reserved 
node
-       var nodeID string
        alloc := result.Request
-       if result.ReservedNodeID == "" {
-               nodeID = result.NodeID
-       } else {
-               nodeID = result.ReservedNodeID
-               log.Log(log.SchedPartition).Debug("Reservation allocated on 
different node",
-                       zap.String("current node", result.NodeID),
-                       zap.String("reserved node", nodeID),
-                       zap.String("appID", appID))
-       }
-       node := pc.GetNode(nodeID)
-       if node == nil {
-               log.Log(log.SchedPartition).Info("Node was removed while 
allocating",
-                       zap.String("nodeID", nodeID),
+       targetNodeID := result.NodeID
+       targetNode := pc.GetNode(targetNodeID)
+       if targetNode == nil {
+               log.Log(log.SchedPartition).Info("Target node was removed while 
allocating",
+                       zap.String("nodeID", targetNodeID),
                        zap.String("appID", appID))
 
                // attempt to deallocate
@@ -889,7 +880,7 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
                        allocKey := alloc.GetAllocationKey()
                        if _, err := app.DeallocateAsk(allocKey); err != nil {
                                log.Log(log.SchedPartition).Warn("Failed to 
unwind allocation",
-                                       zap.String("nodeID", nodeID),
+                                       zap.String("nodeID", targetNodeID),
                                        zap.String("appID", appID),
                                        zap.String("allocationKey", allocKey),
                                        zap.Error(err))
@@ -897,14 +888,34 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
                }
                return nil
        }
+
        // reservation
        if result.ResultType == objects.Reserved {
-               pc.reserve(app, node, result.Request)
+               pc.reserve(app, targetNode, result.Request)
                return nil
        }
+
        // unreserve
        if result.ResultType == objects.Unreserved || result.ResultType == 
objects.AllocatedReserved {
-               pc.unReserve(app, node, result.Request)
+               var reservedNodeID string
+               if result.ReservedNodeID == "" {
+                       reservedNodeID = result.NodeID
+               } else {
+                       reservedNodeID = result.ReservedNodeID
+                       log.Log(log.SchedPartition).Debug("Reservation 
allocated on different node",
+                               zap.String("current node", result.NodeID),
+                               zap.String("reserved node", reservedNodeID),
+                               zap.String("appID", appID))
+               }
+
+               reservedNode := pc.GetNode(reservedNodeID)
+               if reservedNode != nil {
+                       pc.unReserve(app, reservedNode, result.Request)
+               } else {
+                       log.Log(log.SchedPartition).Info("Reserved node was 
removed while allocating",
+                               zap.String("nodeID", reservedNodeID),
+                               zap.String("appID", appID))
+               }
                if result.ResultType == objects.Unreserved {
                        return nil
                }
@@ -913,8 +924,8 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
        }
 
        alloc.SetBindTime(time.Now())
-       alloc.SetNodeID(nodeID)
-       alloc.SetInstanceType(node.GetInstanceType())
+       alloc.SetNodeID(targetNodeID)
+       alloc.SetInstanceType(targetNode.GetInstanceType())
 
        // track the number of allocations
        pc.updateAllocationCount(1)
@@ -927,7 +938,7 @@ func (pc *PartitionContext) allocate(result 
*objects.AllocationResult) *objects.
                zap.String("allocationKey", result.Request.GetAllocationKey()),
                zap.Stringer("allocatedResource", 
result.Request.GetAllocatedResource()),
                zap.Bool("placeholder", result.Request.IsPlaceholder()),
-               zap.String("targetNode", alloc.GetNodeID()))
+               zap.String("targetNode", targetNodeID))
        // pass the allocation result back to the RM via the cluster context
        return result
 }
diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go
index 79c4cdd5..e436ea71 100644
--- a/pkg/scheduler/partition_test.go
+++ b/pkg/scheduler/partition_test.go
@@ -2327,6 +2327,9 @@ func TestAllocReserveNewNode(t *testing.T) {
        assert.Equal(t, 0, len(node1.GetReservationKeys()), "old node should 
have no more reservations")
        assert.Equal(t, 0, len(app.GetReservations()), "ask should have been 
reserved")
        assertLimits(t, getTestUserGroup(), 
resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
+       alloc2 := node2.GetAllocation("alloc-2")
+       assert.Assert(t, alloc2 != nil, "alloc was nil")
+       assert.Equal(t, nodeID2, alloc2.GetNodeID(), "wrong node id")
 }
 
 func TestTryAllocateReserve(t *testing.T) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to