This is an automated email from the ASF dual-hosted git repository. bteke pushed a commit to branch branch-3.4.0 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4.0 by this push: new 4461cfae9ea YARN-11801: NPE in FifoCandidatesSelector.selectCandidates when prempting resources for an auto-created queue without child queues (#7657) 4461cfae9ea is described below commit 4461cfae9ea9d48a1307dccf8194b5cd94b0b7ac Author: Susheel Gupta <38013283+susheelgup...@users.noreply.github.com> AuthorDate: Mon May 12 20:04:01 2025 +0530 YARN-11801: NPE in FifoCandidatesSelector.selectCandidates when prempting resources for an auto-created queue without child queues (#7657) --- .../ProportionalCapacityPreemptionPolicy.java | 17 ++-- .../scheduler/capacity/AbstractParentQueue.java | 9 ++ .../TestProportionalCapacityPreemptionPolicy.java | 95 +++++++++++++++------- 3 files changed, 83 insertions(+), 38 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 443241a664a..e36b075c789 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -22,6 +22,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -40,8 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .ManagedParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; @@ -430,12 +429,14 @@ private void cleanupStaledPreemptionCandidates(long currentTime) { } private Set<String> getLeafQueueNames(TempQueuePerPartition q) { - // Also exclude ParentQueues, which might be without children - if (CollectionUtils.isEmpty(q.children) - && !(q.parentQueue instanceof ManagedParentQueue) - && (q.parentQueue == null - || !q.parentQueue.isEligibleForAutoQueueCreation())) { - return ImmutableSet.of(q.queueName); + // Only consider this a leaf queue if: + // It is a concrete leaf queue (not a childless parent) + if (CollectionUtils.isEmpty(q.children)) { + CSQueue queue = scheduler.getQueue(q.queueName); + if (queue instanceof AbstractLeafQueue) { + return ImmutableSet.of(q.queueName); + } + return Collections.emptySet(); } Set<String> leafQueueNames = new HashSet<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java index 50516dd2bc5..111d6bfd9ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java @@ -552,6 +552,15 @@ public boolean isEligibleForAutoQueueCreation() { return isDynamicQueue() || queueContext.getConfiguration(). isAutoQueueCreationV2Enabled(getQueuePath()); } + /** + * Check whether this queue supports legacy(v1) dynamic child queue creation. + * @return true if queue is eligible to create child queues dynamically using + * the legacy system, false otherwise + */ + public boolean isEligibleForLegacyAutoQueueCreation() { + return isDynamicQueue() || queueContext.getConfiguration(). + isAutoCreateChildQueueEnabled(getQueuePath()); + } @Override public void reinitialize(CSQueue newlyParsedQueue, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index fe89a698cf2..6664f90ce6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -1073,44 +1073,75 @@ public void testRefreshPreemptionProperties() throws Exception { } @Test - public void testLeafQueueNameExtraction() throws Exception { - ProportionalCapacityPreemptionPolicy policy = - buildPolicy(Q_DATA_FOR_IGNORE); + public void testLeafQueueNameExtractionWithFlexibleAQC() throws Exception { + ProportionalCapacityPreemptionPolicy policy = buildPolicy(Q_DATA_FOR_IGNORE); ParentQueue root = (ParentQueue) mCS.getRootQueue(); + root.addDynamicParentQueue("childlessFlexible"); + ParentQueue dynamicParent = setupDynamicParentQueue("root.dynamicParent", true); + extendRootQueueWithMock(root, dynamicParent); + + policy.editSchedule(); + assertFalse( + "root.dynamicLegacyParent" + " should not be a LeafQueue candidate", + policy.getLeafQueueNames().contains( "root.dynamicParent")); + } + + @Test + public void testLeafQueueNameExtractionWithLegacyAQC() throws Exception { + ProportionalCapacityPreemptionPolicy policy = buildPolicy(Q_DATA_FOR_IGNORE); + ParentQueue root = (ParentQueue) mCS.getRootQueue(); + + root.addDynamicParentQueue("childlessLegacy"); + ParentQueue dynamicParent = setupDynamicParentQueue("root.dynamicLegacyParent", false); + extendRootQueueWithMock(root, dynamicParent); + + policy.editSchedule(); + assertFalse("root.dynamicLegacyParent" + " should not be a LeafQueue candidate", + policy.getLeafQueueNames().contains( "root.dynamicLegacyParent")); + } + + private ParentQueue setupDynamicParentQueue(String queuePath, boolean isFlexible) { + ParentQueue dynamicParent = mockParentQueue(null, 0, new LinkedList<>()); + mockQueueFields(dynamicParent, queuePath); + + if (isFlexible) { + when(dynamicParent.isEligibleForAutoQueueCreation()).thenReturn(true); + } else { + when(dynamicParent.isEligibleForLegacyAutoQueueCreation()).thenReturn(true); + } + + return dynamicParent; + } + + private void extendRootQueueWithMock(ParentQueue root, ParentQueue mockQueue) { List<CSQueue> queues = root.getChildQueues(); ArrayList<CSQueue> extendedQueues = new ArrayList<>(); - LinkedList<ParentQueue> pqs = new LinkedList<>(); - ParentQueue dynamicParent = mockParentQueue( - null, 0, pqs); - when(dynamicParent.getQueuePath()).thenReturn("root.dynamicParent"); - when(dynamicParent.getQueueCapacities()).thenReturn( - new QueueCapacities(false)); - QueueResourceQuotas dynamicParentQr = new QueueResourceQuotas(); - dynamicParentQr.setEffectiveMaxResource(Resource.newInstance(1, 1)); - dynamicParentQr.setEffectiveMinResource(Resources.createResource(1)); - dynamicParentQr.setEffectiveMaxResource(RMNodeLabelsManager.NO_LABEL, - Resource.newInstance(1, 1)); - dynamicParentQr.setEffectiveMinResource(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(1)); - when(dynamicParent.getQueueResourceQuotas()).thenReturn(dynamicParentQr); - when(dynamicParent.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL)) - .thenReturn(Resources.createResource(1)); - when(dynamicParent.getEffectiveMaxCapacity(RMNodeLabelsManager.NO_LABEL)) - .thenReturn(Resource.newInstance(1, 1)); - ResourceUsage resUsage = new ResourceUsage(); - resUsage.setUsed(Resources.createResource(1024)); - resUsage.setReserved(Resources.createResource(1024)); - when(dynamicParent.getQueueResourceUsage()).thenReturn(resUsage); - when(dynamicParent.isEligibleForAutoQueueCreation()).thenReturn(true); - extendedQueues.add(dynamicParent); + extendedQueues.add(mockQueue); extendedQueues.addAll(queues); when(root.getChildQueues()).thenReturn(extendedQueues); + } - policy.editSchedule(); + private void mockQueueFields(ParentQueue queue, String queuePath) { + when(queue.getQueuePath()).thenReturn(queuePath); + when(queue.getQueueCapacities()).thenReturn(new QueueCapacities(false)); + + QueueResourceQuotas qrq = new QueueResourceQuotas(); + qrq.setEffectiveMaxResource(Resource.newInstance(1, 1)); + qrq.setEffectiveMinResource(Resources.createResource(1)); + qrq.setEffectiveMaxResource(RMNodeLabelsManager.NO_LABEL, Resource.newInstance(1, 1)); + qrq.setEffectiveMinResource(RMNodeLabelsManager.NO_LABEL, Resources.createResource(1)); - assertFalse("dynamicParent should not be a LeafQueue " + - "candidate", policy.getLeafQueueNames().contains("root.dynamicParent")); + when(queue.getQueueResourceQuotas()).thenReturn(qrq); + when(queue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL)) + .thenReturn(Resources.createResource(1)); + when(queue.getEffectiveMaxCapacity(RMNodeLabelsManager.NO_LABEL)) + .thenReturn(Resource.newInstance(1, 1)); + + ResourceUsage usage = new ResourceUsage(); + usage.setUsed(Resources.createResource(1024)); + usage.setReserved(Resources.createResource(1024)); + when(queue.getQueueResourceUsage()).thenReturn(usage); } static class IsPreemptionRequestFor @@ -1359,6 +1390,10 @@ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, Resource[] used, Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran) { LeafQueue lq = mock(LeafQueue.class); + + String queuePath = p.getQueuePath() + ".queue" + (char)('A' + i - 1); + when(mCS.getQueue(queuePath)).thenReturn(lq); + ResourceCalculator rc = mCS.getResourceCalculator(); List<ApplicationAttemptId> appAttemptIdList = new ArrayList<ApplicationAttemptId>(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org