YARN-8002. Support NOT_SELF and ALL namespace types for allocation tag. (Weiwei Yang via wangda)
Change-Id: I63b4e4192a95bf7ded98c54e46a2871c72869700 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d199274b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d199274b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d199274b Branch: refs/heads/HDFS-12996 Commit: d199274bbefc586f8052da479713c394e28546d2 Parents: 3533cba Author: Wangda Tan <wan...@apache.org> Authored: Mon Mar 19 11:04:27 2018 -0700 Committer: Hanisha Koneru <hanishakon...@apache.org> Committed: Wed Mar 21 16:46:52 2018 -0700 ---------------------------------------------------------------------- .../api/records/AllocationTagNamespace.java | 336 ------------------- .../api/records/AllocationTagNamespaceType.java | 29 -- .../hadoop/yarn/api/records/AllocationTags.java | 50 --- .../hadoop/yarn/api/records/Evaluable.java | 38 --- .../yarn/api/records/TargetApplications.java | 53 --- .../yarn/api/resource/PlacementConstraints.java | 58 +++- .../InvalidAllocationTagException.java | 34 -- .../constraint/AllocationTagNamespace.java | 312 +++++++++++++++++ .../scheduler/constraint/AllocationTags.java | 82 +++++ .../constraint/AllocationTagsManager.java | 146 ++++++-- .../scheduler/constraint/Evaluable.java | 38 +++ .../constraint/PlacementConstraintsUtil.java | 55 +-- .../constraint/TargetApplications.java | 51 +++ .../algorithm/LocalAllocationTagsManager.java | 27 +- .../SingleConstraintAppPlacementAllocator.java | 17 +- .../yarn/server/resourcemanager/MockAM.java | 11 +- .../rmcontainer/TestRMContainerImpl.java | 47 ++- ...estSchedulingRequestContainerAllocation.java | 126 +++++++ .../constraint/TestAllocationTagsManager.java | 272 ++++++++++++--- .../constraint/TestAllocationTagsNamespace.java | 36 +- .../TestPlacementConstraintsUtil.java | 247 +++++++++++++- .../TestLocalAllocationTagsManager.java | 33 +- ...stSingleConstraintAppPlacementAllocator.java | 9 +- 23 files changed, 1383 insertions(+), 724 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java deleted file mode 100644 index 25f8761..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java +++ /dev/null @@ -1,336 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL; -import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString; - -/** - * Class to describe the namespace of an allocation tag. - * Each namespace can be evaluated against a set of applications. - * After evaluation, the namespace should have an implicit set of - * applications which defines its scope. - */ -public abstract class AllocationTagNamespace implements - Evaluable<TargetApplications> { - - public final static String NAMESPACE_DELIMITER = "/"; - - private AllocationTagNamespaceType nsType; - // Namespace scope value will be delay binding by eval method. - private Set<ApplicationId> nsScope; - - public AllocationTagNamespace(AllocationTagNamespaceType - allocationTagNamespaceType) { - this.nsType = allocationTagNamespaceType; - } - - protected void setScopeIfNotNull(Set<ApplicationId> appIds) { - if (appIds != null) { - this.nsScope = appIds; - } - } - - /** - * Get the type of the namespace. - * @return namespace type. - */ - public AllocationTagNamespaceType getNamespaceType() { - return nsType; - } - - /** - * Get the scope of the namespace, in form of a set of applications. - * Before calling this method, {@link #evaluate(TargetApplications)} - * must be called in prior to ensure the scope is proper evaluated. - * - * @return a set of applications. - */ - public Set<ApplicationId> getNamespaceScope() { - if (this.nsScope == null) { - throw new IllegalStateException("Invalid namespace scope," - + " it is not initialized. Evaluate must be called before" - + " a namespace can be consumed."); - } - return this.nsScope; - } - - @Override - public abstract void evaluate(TargetApplications target) - throws InvalidAllocationTagException; - - /** - * @return true if the namespace is effective in all applications - * in this cluster. Specifically the namespace prefix should be - * "all". - */ - public boolean isGlobal() { - return AllocationTagNamespaceType.ALL.equals(getNamespaceType()); - } - - /** - * @return true if the namespace is effective within a single application - * by its application ID, the namespace prefix should be "app-id"; - * false otherwise. - */ - public boolean isSingleInterApp() { - return AllocationTagNamespaceType.APP_ID.equals(getNamespaceType()); - } - - /** - * @return true if the namespace is effective to the application itself, - * the namespace prefix should be "self"; false otherwise. - */ - public boolean isIntraApp() { - return AllocationTagNamespaceType.SELF.equals(getNamespaceType()); - } - - /** - * @return true if the namespace is effective to all applications except - * itself, the namespace prefix should be "not-self"; false otherwise. - */ - public boolean isNotSelf() { - return AllocationTagNamespaceType.NOT_SELF.equals(getNamespaceType()); - } - - /** - * @return true if the namespace is effective to a group of applications - * identified by a application label, the namespace prefix should be - * "app-label"; false otherwise. - */ - public boolean isAppLabel() { - return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType()); - } - - @Override - public String toString() { - return this.nsType.toString(); - } - - /** - * Namespace within application itself. - */ - public static class Self extends AllocationTagNamespace { - - public Self() { - super(SELF); - } - - @Override - public void evaluate(TargetApplications target) - throws InvalidAllocationTagException { - if (target == null || target.getCurrentApplicationId() == null) { - throw new InvalidAllocationTagException("Namespace Self must" - + " be evaluated against a single application ID."); - } - ApplicationId applicationId = target.getCurrentApplicationId(); - setScopeIfNotNull(ImmutableSet.of(applicationId)); - } - } - - /** - * Namespace to all applications except itself. - */ - public static class NotSelf extends AllocationTagNamespace { - - private ApplicationId applicationId; - - public NotSelf() { - super(NOT_SELF); - } - - /** - * The scope of self namespace is to an application itself, - * the application ID can be delay binding to the namespace. - * - * @param appId application ID. - */ - public void setApplicationId(ApplicationId appId) { - this.applicationId = appId; - } - - public ApplicationId getApplicationId() { - return this.applicationId; - } - - @Override - public void evaluate(TargetApplications target) { - Set<ApplicationId> otherAppIds = target.getOtherApplicationIds(); - setScopeIfNotNull(otherAppIds); - } - } - - /** - * Namespace to all applications in the cluster. - */ - public static class All extends AllocationTagNamespace { - - public All() { - super(ALL); - } - - @Override - public void evaluate(TargetApplications target) { - Set<ApplicationId> allAppIds = target.getAllApplicationIds(); - setScopeIfNotNull(allAppIds); - } - } - - /** - * Namespace to all applications in the cluster. - */ - public static class AppLabel extends AllocationTagNamespace { - - public AppLabel() { - super(APP_LABEL); - } - - @Override - public void evaluate(TargetApplications target) { - // TODO Implement app-label namespace evaluation - } - } - - /** - * Namespace defined by a certain application ID. - */ - public static class AppID extends AllocationTagNamespace { - - private ApplicationId targetAppId; - // app-id namespace requires an extra value of an application id. - public AppID(ApplicationId applicationId) { - super(APP_ID); - this.targetAppId = applicationId; - } - - @Override - public void evaluate(TargetApplications target) { - setScopeIfNotNull(ImmutableSet.of(targetAppId)); - } - - @Override - public String toString() { - return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId; - } - } - - /** - * Parse namespace from a string. The string must be in legal format - * defined by each {@link AllocationTagNamespaceType}. - * - * @param namespaceStr namespace string. - * @return an instance of {@link AllocationTagNamespace}. - * @throws InvalidAllocationTagException - * if given string is not in valid format - */ - public static AllocationTagNamespace parse(String namespaceStr) - throws InvalidAllocationTagException { - // Return the default namespace if no valid string is given. - if (Strings.isNullOrEmpty(namespaceStr)) { - return new Self(); - } - - // Normalize the input, escape additional chars. - List<String> nsValues = normalize(namespaceStr); - // The first string should be the prefix. - String nsPrefix = nsValues.get(0); - AllocationTagNamespaceType allocationTagNamespaceType = - fromString(nsPrefix); - switch (allocationTagNamespaceType) { - case SELF: - return new Self(); - case NOT_SELF: - return new NotSelf(); - case ALL: - return new All(); - case APP_ID: - if (nsValues.size() != 2) { - throw new InvalidAllocationTagException( - "Missing the application ID in the namespace string: " - + namespaceStr); - } - String appIDStr = nsValues.get(1); - return parseAppID(appIDStr); - case APP_LABEL: - return new AppLabel(); - default: - throw new InvalidAllocationTagException( - "Invalid namespace string " + namespaceStr); - } - } - - private static AllocationTagNamespace parseAppID(String appIDStr) - throws InvalidAllocationTagException { - try { - ApplicationId applicationId = ApplicationId.fromString(appIDStr); - return new AppID(applicationId); - } catch (IllegalArgumentException e) { - throw new InvalidAllocationTagException( - "Invalid application ID for " - + APP_ID.getTypeKeyword() + ": " + appIDStr); - } - } - - /** - * Valid given namespace string and parse it to a list of sub-strings - * that can be consumed by the parser according to the type of the - * namespace. Currently the size of return list should be either 1 or 2. - * Extra slash is escaped during the normalization. - * - * @param namespaceStr namespace string. - * @return a list of parsed strings. - * @throws InvalidAllocationTagException - * if namespace format is unexpected. - */ - private static List<String> normalize(String namespaceStr) - throws InvalidAllocationTagException { - List<String> result = new ArrayList<>(); - if (namespaceStr == null) { - return result; - } - - String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER); - for (String str : nsValues) { - if (!Strings.isNullOrEmpty(str)) { - result.add(str); - } - } - - // Currently we only allow 1 or 2 values for a namespace string - if (result.size() == 0 || result.size() > 2) { - throw new InvalidAllocationTagException("Invalid namespace string: " - + namespaceStr + ", the syntax is <namespace_prefix> or" - + " <namespace_prefix>/<namespace_value>"); - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java index 5e46cd0..de5492e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java @@ -18,12 +18,6 @@ package org.apache.hadoop.yarn.api.records; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; - -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Collectors; - /** * Class to describe all supported forms of namespaces for an allocation tag. */ @@ -44,29 +38,6 @@ public enum AllocationTagNamespaceType { return this.typeKeyword; } - /** - * Parses the namespace type from a given string. - * @param prefix namespace prefix. - * @return namespace type. - * @throws InvalidAllocationTagException - */ - public static AllocationTagNamespaceType fromString(String prefix) throws - InvalidAllocationTagException { - for (AllocationTagNamespaceType type : - AllocationTagNamespaceType.values()) { - if(type.getTypeKeyword().equals(prefix)) { - return type; - } - } - - Set<String> values = Arrays.stream(AllocationTagNamespaceType.values()) - .map(AllocationTagNamespaceType::toString) - .collect(Collectors.toSet()); - throw new InvalidAllocationTagException( - "Invalid namespace prefix: " + prefix - + ", valid values are: " + String.join(",", values)); - } - @Override public String toString() { return this.getTypeKeyword(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java deleted file mode 100644 index 50bffc3..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records; - -import java.util.Set; - -/** - * Allocation tags under same namespace. - */ -public class AllocationTags { - - private AllocationTagNamespace ns; - private Set<String> tags; - - public AllocationTags(AllocationTagNamespace namespace, - Set<String> allocationTags) { - this.ns = namespace; - this.tags = allocationTags; - } - - /** - * @return the namespace of these tags. - */ - public AllocationTagNamespace getNamespace() { - return this.ns; - } - - /** - * @return the allocation tags. - */ - public Set<String> getTags() { - return this.tags; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java deleted file mode 100644 index 7a74002..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records; - -import org.apache.hadoop.yarn.exceptions.YarnException; - -/** - * A class implements Evaluable interface represents the internal state - * of the class can be changed against a given target. - * @param <T> a target to evaluate against - */ -public interface Evaluable<T> { - - /** - * Evaluate against a given target, this process changes the internal state - * of current class. - * - * @param target a generic type target that impacts this evaluation. - * @throws YarnException - */ - void evaluate(T target) throws YarnException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java deleted file mode 100644 index de0ea26..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records; - -import java.util.Set; -import java.util.stream.Collectors; - -/** - * This class is used by - * {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate - * a namespace. - */ -public class TargetApplications { - - private ApplicationId currentAppId; - private Set<ApplicationId> allAppIds; - - public TargetApplications(ApplicationId currentApplicationId, - Set<ApplicationId> allApplicationIds) { - this.currentAppId = currentApplicationId; - this.allAppIds = allApplicationIds; - } - - public Set<ApplicationId> getAllApplicationIds() { - return this.allAppIds; - } - - public ApplicationId getCurrentApplicationId() { - return this.currentAppId; - } - - public Set<ApplicationId> getOtherApplicationIds() { - return allAppIds == null ? null : allAppIds.stream().filter(appId -> - !appId.equals(getCurrentApplicationId())) - .collect(Collectors.toSet()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java index af70e2a..02138bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr; @@ -108,6 +108,25 @@ public final class PlacementConstraints { } /** + * Similar to {@link #cardinality(String, int, int, String...)}, but let you + * attach a namespace to the given allocation tags. + * + * @param scope the scope of the constraint + * @param namespace the namespace of the allocation tags + * @param minCardinality determines the minimum number of allocations within + * the scope + * @param maxCardinality determines the maximum number of allocations within + * the scope + * @param allocationTags allocation tags + * @return the resulting placement constraint + */ + public static AbstractConstraint cardinality(String scope, String namespace, + int minCardinality, int maxCardinality, String... allocationTags) { + return new SingleConstraint(scope, minCardinality, maxCardinality, + PlacementTargets.allocationTagWithNamespace(namespace, allocationTags)); + } + + /** * Similar to {@link #cardinality(String, int, int, String...)}, but * determines only the minimum cardinality (the maximum cardinality is * unbound). @@ -125,6 +144,23 @@ public final class PlacementConstraints { } /** + * Similar to {@link #minCardinality(String, int, String...)}, but let you + * attach a namespace to the allocation tags. + * + * @param scope the scope of the constraint + * @param namespace the namespace of these tags + * @param minCardinality determines the minimum number of allocations within + * the scope + * @param allocationTags the constraint targets allocations with these tags + * @return the resulting placement constraint + */ + public static AbstractConstraint minCardinality(String scope, + String namespace, int minCardinality, String... allocationTags) { + return cardinality(scope, namespace, minCardinality, Integer.MAX_VALUE, + allocationTags); + } + + /** * Similar to {@link #cardinality(String, int, int, String...)}, but * determines only the maximum cardinality (the minimum cardinality is 0). * @@ -140,6 +176,23 @@ public final class PlacementConstraints { } /** + * Similar to {@link #maxCardinality(String, int, String...)}, but let you + * specify a namespace for the tags, see supported namespaces in + * {@link AllocationTagNamespaceType}. + * + * @param scope the scope of the constraint + * @param tagNamespace the namespace of these tags + * @param maxCardinality determines the maximum number of allocations within + * the scope + * @param allocationTags allocation tags + * @return the resulting placement constraint + */ + public static AbstractConstraint maxCardinality(String scope, + String tagNamespace, int maxCardinality, String... allocationTags) { + return cardinality(scope, tagNamespace, 0, maxCardinality, allocationTags); + } + + /** * This constraint generalizes the cardinality and target constraints. * * Consider a set of nodes N that belongs to the scope specified in the @@ -242,9 +295,8 @@ public final class PlacementConstraints { */ public static TargetExpression allocationTagToIntraApp( String... allocationTags) { - AllocationTagNamespace selfNs = new AllocationTagNamespace.Self(); return new TargetExpression(TargetType.ALLOCATION_TAG, - selfNs.toString(), allocationTags); + AllocationTagNamespaceType.SELF.toString(), allocationTags); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java deleted file mode 100644 index be8d881..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.exceptions; - -/** - * This exception is thrown by - * {@link - * org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)} - * when it fails to parse a namespace. - */ -public class InvalidAllocationTagException extends YarnException { - - private static final long serialVersionUID = 1L; - - public InvalidAllocationTagException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java new file mode 100644 index 0000000..7b9f3be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL; + +/** + * Class to describe the namespace of an allocation tag. + * Each namespace can be evaluated against a set of applications. + * After evaluation, the namespace should have an implicit set of + * applications which defines its scope. + */ +public abstract class AllocationTagNamespace implements + Evaluable<TargetApplications> { + + public final static String NAMESPACE_DELIMITER = "/"; + + private AllocationTagNamespaceType nsType; + // Namespace scope value will be delay binding by eval method. + private Set<ApplicationId> nsScope; + + public AllocationTagNamespace(AllocationTagNamespaceType + allocationTagNamespaceType) { + this.nsType = allocationTagNamespaceType; + } + + protected void setScopeIfNotNull(Set<ApplicationId> appIds) { + if (appIds != null) { + this.nsScope = appIds; + } + } + + /** + * Get the type of the namespace. + * @return namespace type. + */ + public AllocationTagNamespaceType getNamespaceType() { + return nsType; + } + + /** + * Get the scope of the namespace, in form of a set of applications. + * + * @return a set of applications. + */ + public Set<ApplicationId> getNamespaceScope() { + if (this.nsScope == null) { + throw new IllegalStateException("Invalid namespace scope," + + " it is not initialized. Evaluate must be called before" + + " a namespace can be consumed."); + } + return this.nsScope; + } + + /** + * Evaluate the namespace against given target applications + * if it is necessary. Only self/not-self/app-label namespace types + * require this evaluation step, because they are not binding to a + * specific scope during initiating. So we do lazy binding for them + * in this method. + * + * @param target a generic type target that impacts this evaluation. + * @throws InvalidAllocationTagsQueryException + */ + @Override + public void evaluate(TargetApplications target) + throws InvalidAllocationTagsQueryException { + // Sub-class needs to override this when it requires the eval step. + } + + @Override + public String toString() { + return this.nsType.toString(); + } + + /** + * Namespace within application itself. + */ + public static class Self extends AllocationTagNamespace { + + public Self() { + super(SELF); + } + + @Override + public void evaluate(TargetApplications target) + throws InvalidAllocationTagsQueryException { + if (target == null || target.getCurrentApplicationId() == null) { + throw new InvalidAllocationTagsQueryException("Namespace Self must" + + " be evaluated against a single application ID."); + } + ApplicationId applicationId = target.getCurrentApplicationId(); + setScopeIfNotNull(ImmutableSet.of(applicationId)); + } + } + + /** + * Namespace to all applications except itself. + */ + public static class NotSelf extends AllocationTagNamespace { + + private ApplicationId applicationId; + + public NotSelf() { + super(NOT_SELF); + } + + /** + * The scope of self namespace is to an application itself, + * the application ID can be delay binding to the namespace. + * + * @param appId application ID. + */ + public void setApplicationId(ApplicationId appId) { + this.applicationId = appId; + } + + public ApplicationId getApplicationId() { + return this.applicationId; + } + + @Override + public void evaluate(TargetApplications target) { + Set<ApplicationId> otherAppIds = target.getOtherApplicationIds(); + setScopeIfNotNull(otherAppIds); + } + } + + /** + * Namespace to all applications in the cluster. + */ + public static class All extends AllocationTagNamespace { + + public All() { + super(ALL); + } + } + + /** + * Namespace to all applications in the cluster. + */ + public static class AppLabel extends AllocationTagNamespace { + + public AppLabel() { + super(APP_LABEL); + } + + @Override + public void evaluate(TargetApplications target) { + // TODO Implement app-label namespace evaluation + } + } + + /** + * Namespace defined by a certain application ID. + */ + public static class AppID extends AllocationTagNamespace { + + private ApplicationId targetAppId; + // app-id namespace requires an extra value of an application id. + public AppID(ApplicationId applicationId) { + super(APP_ID); + this.targetAppId = applicationId; + setScopeIfNotNull(ImmutableSet.of(targetAppId)); + } + + @Override + public String toString() { + return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId; + } + } + + /** + * Parse namespace from a string. The string must be in legal format + * defined by each {@link AllocationTagNamespaceType}. + * + * @param namespaceStr namespace string. + * @return an instance of {@link AllocationTagNamespace}. + * @throws InvalidAllocationTagsQueryException + * if given string is not in valid format + */ + public static AllocationTagNamespace parse(String namespaceStr) + throws InvalidAllocationTagsQueryException { + // Return the default namespace if no valid string is given. + if (Strings.isNullOrEmpty(namespaceStr)) { + return new Self(); + } + + // Normalize the input, escape additional chars. + List<String> nsValues = normalize(namespaceStr); + // The first string should be the prefix. + String nsPrefix = nsValues.get(0); + AllocationTagNamespaceType allocationTagNamespaceType = + fromString(nsPrefix); + switch (allocationTagNamespaceType) { + case SELF: + return new Self(); + case NOT_SELF: + return new NotSelf(); + case ALL: + return new All(); + case APP_ID: + if (nsValues.size() != 2) { + throw new InvalidAllocationTagsQueryException( + "Missing the application ID in the namespace string: " + + namespaceStr); + } + String appIDStr = nsValues.get(1); + return parseAppID(appIDStr); + case APP_LABEL: + return new AppLabel(); + default: + throw new InvalidAllocationTagsQueryException( + "Invalid namespace string " + namespaceStr); + } + } + + private static AllocationTagNamespaceType fromString(String prefix) throws + InvalidAllocationTagsQueryException { + for (AllocationTagNamespaceType type : + AllocationTagNamespaceType.values()) { + if(type.getTypeKeyword().equals(prefix)) { + return type; + } + } + + Set<String> values = Arrays.stream(AllocationTagNamespaceType.values()) + .map(AllocationTagNamespaceType::toString) + .collect(Collectors.toSet()); + throw new InvalidAllocationTagsQueryException( + "Invalid namespace prefix: " + prefix + + ", valid values are: " + String.join(",", values)); + } + + private static AllocationTagNamespace parseAppID(String appIDStr) + throws InvalidAllocationTagsQueryException { + try { + ApplicationId applicationId = ApplicationId.fromString(appIDStr); + return new AppID(applicationId); + } catch (IllegalArgumentException e) { + throw new InvalidAllocationTagsQueryException( + "Invalid application ID for " + + APP_ID.getTypeKeyword() + ": " + appIDStr); + } + } + + /** + * Valid given namespace string and parse it to a list of sub-strings + * that can be consumed by the parser according to the type of the + * namespace. Currently the size of return list should be either 1 or 2. + * Extra slash is escaped during the normalization. + * + * @param namespaceStr namespace string. + * @return a list of parsed strings. + * @throws InvalidAllocationTagsQueryException + * if namespace format is unexpected. + */ + private static List<String> normalize(String namespaceStr) + throws InvalidAllocationTagsQueryException { + List<String> result = new ArrayList<>(); + if (namespaceStr == null) { + return result; + } + + String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER); + for (String str : nsValues) { + if (!Strings.isNullOrEmpty(str)) { + result.add(str); + } + } + + // Currently we only allow 1 or 2 values for a namespace string + if (result.size() == 0 || result.size() > 2) { + throw new InvalidAllocationTagsQueryException("Invalid namespace string: " + + namespaceStr + ", the syntax is <namespace_prefix> or" + + " <namespace_prefix>/<namespace_value>"); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java new file mode 100644 index 0000000..dc0237e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import java.util.Set; + +/** + * Allocation tags under same namespace. + */ +public final class AllocationTags { + + private AllocationTagNamespace ns; + private Set<String> tags; + + private AllocationTags(AllocationTagNamespace namespace, + Set<String> allocationTags) { + this.ns = namespace; + this.tags = allocationTags; + } + + /** + * @return the namespace of these tags. + */ + public AllocationTagNamespace getNamespace() { + return this.ns; + } + + /** + * @return the allocation tags. + */ + public Set<String> getTags() { + return this.tags; + } + + @VisibleForTesting + public static AllocationTags createSingleAppAllocationTags( + ApplicationId appId, Set<String> tags) { + AllocationTagNamespace namespace = new AllocationTagNamespace.AppID(appId); + return new AllocationTags(namespace, tags); + } + + @VisibleForTesting + public static AllocationTags createGlobalAllocationTags(Set<String> tags) { + AllocationTagNamespace namespace = new AllocationTagNamespace.All(); + return new AllocationTags(namespace, tags); + } + + @VisibleForTesting + public static AllocationTags createOtherAppAllocationTags( + ApplicationId currentApp, Set<ApplicationId> allIds, Set<String> tags) + throws InvalidAllocationTagsQueryException { + AllocationTagNamespace namespace = new AllocationTagNamespace.NotSelf(); + TargetApplications ta = new TargetApplications(currentApp, allIds); + namespace.evaluate(ta); + return new AllocationTags(namespace, tags); + } + + public static AllocationTags newAllocationTags( + AllocationTagNamespace namespace, Set<String> tags) { + return new AllocationTags(namespace, tags); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index fb2619a..830566a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -22,9 +22,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.log4j.Logger; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -75,6 +78,12 @@ public class AllocationTagsManager { // Map<Type, Map<Tag, Count>> private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>(); + public TypeToCountedTags() {} + + private TypeToCountedTags(Map<T, Map<String, Long>> tags) { + this.typeToTagsWithCount = tags; + } + // protected by external locks private void addTags(T type, Set<String> tags) { Map<String, Long> innerMap = @@ -206,6 +215,52 @@ public class AllocationTagsManager { public Map<T, Map<String, Long>> getTypeToTagsWithCount() { return typeToTagsWithCount; } + + /** + * Absorbs the given {@link TypeToCountedTags} to current mapping, + * this will aggregate the count of the tags with same name. + * + * @param target a {@link TypeToCountedTags} to merge with. + */ + protected void absorb(final TypeToCountedTags<T> target) { + // No opt if the given target is null. + if (target == null || target.getTypeToTagsWithCount() == null) { + return; + } + + // Merge the target. + Map<T, Map<String, Long>> targetMap = target.getTypeToTagsWithCount(); + for (Map.Entry<T, Map<String, Long>> targetEntry : + targetMap.entrySet()) { + // Get a mutable copy, do not modify the target reference. + Map<String, Long> copy = Maps.newHashMap(targetEntry.getValue()); + + // If the target type doesn't exist in the current mapping, + // add as a new entry. + Map<String, Long> existingMapping = + this.typeToTagsWithCount.putIfAbsent(targetEntry.getKey(), copy); + // There was a mapping for this target type, + // do proper merging on the operator. + if (existingMapping != null) { + Map<String, Long> localMap = + this.typeToTagsWithCount.get(targetEntry.getKey()); + // Merge the target map to the inner map. + Map<String, Long> targetValue = targetEntry.getValue(); + for (Map.Entry<String, Long> entry : targetValue.entrySet()) { + localMap.merge(entry.getKey(), entry.getValue(), + (a, b) -> Long.sum(a, b)); + } + } + } + } + + /** + * @return an immutable copy of current instance. + */ + protected TypeToCountedTags immutableCopy() { + return new TypeToCountedTags( + Collections.unmodifiableMap(this.typeToTagsWithCount)); + } } @VisibleForTesting @@ -236,6 +291,34 @@ public class AllocationTagsManager { } /** + * Aggregates multiple {@link TypeToCountedTags} to a single one based on + * a given set of application IDs, the values are properly merged. + * + * @param appIds a set of application IDs. + * @return an aggregated {@link TypeToCountedTags}. + */ + private TypeToCountedTags aggregateAllocationTags(Set<ApplicationId> appIds, + Map<ApplicationId, TypeToCountedTags> mapping) { + TypeToCountedTags result = new TypeToCountedTags(); + if (appIds != null) { + if (appIds.size() == 1) { + // If there is only one app, we simply return the mapping + // without any extra computation. + return mapping.get(appIds.iterator().next()); + } + + for (ApplicationId applicationId : appIds) { + TypeToCountedTags appIdTags = mapping.get(applicationId); + if (appIdTags != null) { + // Make sure ATM state won't be changed. + result.absorb(appIdTags.immutableCopy()); + } + } + } + return result; + } + + /** * Notify container allocated on a node. * * @param nodeId allocated node. @@ -458,9 +541,8 @@ public class AllocationTagsManager { * to implement customized logic. * * @param nodeId nodeId, required. - * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all applications. - * @param tags allocation tags, see + * @param tags {@link AllocationTags}, allocation tags under a + * specific namespace. See * {@link SchedulingRequest#getAllocationTags()}, * When multiple tags specified. Returns cardinality * depends on op. If a specified tag doesn't exist, 0 @@ -474,29 +556,28 @@ public class AllocationTagsManager { * @throws InvalidAllocationTagsQueryException when illegal query * parameter specified */ - public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId, - Set<String> tags, LongBinaryOperator op) - throws InvalidAllocationTagsQueryException { + public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags, + LongBinaryOperator op) throws InvalidAllocationTagsQueryException { readLock.lock(); - try { - if (nodeId == null || op == null) { + if (nodeId == null || op == null || tags == null) { throw new InvalidAllocationTagsQueryException( "Must specify nodeId/tags/op to query cardinality"); } TypeToCountedTags mapping; - if (applicationId != null) { - mapping = perAppNodeMappings.get(applicationId); - } else { + if (AllocationTagNamespaceType.ALL.equals( + tags.getNamespace().getNamespaceType())) { mapping = globalNodeMapping; + } else { + // Aggregate app tags cardinality by applications. + mapping = aggregateAllocationTags( + tags.getNamespace().getNamespaceScope(), + perAppNodeMappings); } - if (mapping == null) { - return 0; - } - - return mapping.getCardinality(nodeId, tags, op); + return mapping == null ? 0 : + mapping.getCardinality(nodeId, tags.getTags(), op); } finally { readLock.unlock(); } @@ -507,9 +588,8 @@ public class AllocationTagsManager { * to implement customized logic. * * @param rack rack, required. - * @param applicationId applicationId. When null is specified, return - * aggregated cardinality among all applications. - * @param tags allocation tags, see + * @param tags {@link AllocationTags}, allocation tags under a + * specific namespace. See * {@link SchedulingRequest#getAllocationTags()}, * When multiple tags specified. Returns cardinality * depends on op. If a specified tag doesn't exist, 0 @@ -523,30 +603,28 @@ public class AllocationTagsManager { * @throws InvalidAllocationTagsQueryException when illegal query * parameter specified */ - @SuppressWarnings("unchecked") - public long getRackCardinalityByOp(String rack, ApplicationId applicationId, - Set<String> tags, LongBinaryOperator op) - throws InvalidAllocationTagsQueryException { + public long getRackCardinalityByOp(String rack, AllocationTags tags, + LongBinaryOperator op) throws InvalidAllocationTagsQueryException { readLock.lock(); - try { - if (rack == null || op == null) { + if (rack == null || op == null || tags == null) { throw new InvalidAllocationTagsQueryException( - "Must specify rack/tags/op to query cardinality"); + "Must specify nodeId/tags/op to query cardinality"); } TypeToCountedTags mapping; - if (applicationId != null) { - mapping = perAppRackMappings.get(applicationId); - } else { + if (AllocationTagNamespaceType.ALL.equals( + tags.getNamespace().getNamespaceType())) { mapping = globalRackMapping; + } else { + // Aggregates cardinality by rack. + mapping = aggregateAllocationTags( + tags.getNamespace().getNamespaceScope(), + perAppRackMappings); } - if (mapping == null) { - return 0; - } - - return mapping.getCardinality(rack, tags, op); + return mapping == null ? 0 : + mapping.getCardinality(rack, tags.getTags(), op); } finally { readLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java new file mode 100644 index 0000000..6a7e54e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * A class implements Evaluable interface represents the internal state + * of the class can be changed against a given target. + * @param <T> a target to evaluate against + */ +public interface Evaluable<T> { + + /** + * Evaluate against a given target, this process changes the internal state + * of current class. + * + * @param target a generic type target that impacts this evaluation. + * @throws YarnException + */ + void evaluate(T target) throws YarnException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 2d0e95a..389fc5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -24,11 +24,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.hadoop.yarn.api.records.TargetApplications; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; @@ -38,7 +36,6 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; @@ -70,43 +67,25 @@ public final class PlacementConstraintsUtil { */ private static AllocationTagNamespace getAllocationTagNamespace( ApplicationId currentAppId, String targetKey, AllocationTagsManager atm) - throws InvalidAllocationTagException{ + throws InvalidAllocationTagsQueryException { // Parse to a valid namespace. AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey); - // TODO remove such check once we support all forms of namespaces - if (!namespace.isIntraApp() && !namespace.isSingleInterApp()) { - throw new InvalidAllocationTagException( - "Only support " + AllocationTagNamespaceType.SELF.toString() - + " and "+ AllocationTagNamespaceType.APP_ID + " now," - + namespace.toString() + " is not supported yet!"); + // TODO Complete remove this check once we support app-label. + if (AllocationTagNamespaceType.APP_LABEL + .equals(namespace.getNamespaceType())) { + throw new InvalidAllocationTagsQueryException( + namespace.toString() + " is not supported yet!"); } // Evaluate the namespace according to the given target // before it can be consumed. - TargetApplications ta = new TargetApplications(currentAppId, - atm.getAllApplicationIds()); + TargetApplications ta = + new TargetApplications(currentAppId, atm.getAllApplicationIds()); namespace.evaluate(ta); return namespace; } - // We return a single app Id now, because at present, - // only self and app-id namespace is supported. But moving on, - // this will return a set of application IDs. - // TODO support other forms of namespaces - private static ApplicationId getNamespaceScope( - AllocationTagNamespace namespace) - throws InvalidAllocationTagException { - if (namespace.getNamespaceScope() == null - || namespace.getNamespaceScope().size() != 1) { - throw new InvalidAllocationTagException( - "Invalid allocation tag namespace " + namespace.toString() - + ", expecting it is not null and only 1 application" - + " ID in the scope."); - } - return namespace.getNamespaceScope().iterator().next(); - } - /** * Returns true if <b>single</b> placement constraint with associated * allocationTags and scope is satisfied by a specific scheduler Node. @@ -128,14 +107,10 @@ public final class PlacementConstraintsUtil { // Parse the allocation tag's namespace from the given target key, // then evaluate the namespace and get its scope, // which is represented by one or more application IDs. - ApplicationId effectiveAppID; - try { - AllocationTagNamespace namespace = getAllocationTagNamespace( + AllocationTagNamespace namespace = getAllocationTagNamespace( targetApplicationId, te.getTargetKey(), tm); - effectiveAppID = getNamespaceScope(namespace); - } catch (InvalidAllocationTagException e) { - throw new InvalidAllocationTagsQueryException(e); - } + AllocationTags allocationTags = AllocationTags + .newAllocationTags(namespace, te.getTargetValues()); long minScopeCardinality = 0; long maxScopeCardinality = 0; @@ -149,20 +124,20 @@ public final class PlacementConstraintsUtil { if (sc.getScope().equals(PlacementConstraints.NODE)) { if (checkMinCardinality) { minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), - effectiveAppID, te.getTargetValues(), Long::max); + allocationTags, Long::max); } if (checkMaxCardinality) { maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), - effectiveAppID, te.getTargetValues(), Long::min); + allocationTags, Long::min); } } else if (sc.getScope().equals(PlacementConstraints.RACK)) { if (checkMinCardinality) { minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), - effectiveAppID, te.getTargetValues(), Long::max); + allocationTags, Long::max); } if (checkMaxCardinality) { maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), - effectiveAppID, te.getTargetValues(), Long::min); + allocationTags, Long::min); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java new file mode 100644 index 0000000..0de7c9e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is used by + * {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate + * a namespace. + */ +public class TargetApplications { + + private ApplicationId currentAppId; + private Set<ApplicationId> allAppIds; + + public TargetApplications(ApplicationId currentApplicationId, + Set<ApplicationId> allApplicationIds) { + this.currentAppId = currentApplicationId; + this.allAppIds = allApplicationIds; + } + + public ApplicationId getCurrentApplicationId() { + return this.currentAppId; + } + + public Set<ApplicationId> getOtherApplicationIds() { + return allAppIds == null ? null : allAppIds.stream().filter(appId -> + !appId.equals(getCurrentApplicationId())) + .collect(Collectors.toSet()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java index 9472719..1fce466 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -139,29 +140,27 @@ class LocalAllocationTagsManager extends AllocationTagsManager { } @Override - public long getRackCardinality(String rack, ApplicationId applicationId, - String tag) throws InvalidAllocationTagsQueryException { - return tagsManager.getRackCardinality(rack, applicationId, tag); + public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags, + LongBinaryOperator op) throws InvalidAllocationTagsQueryException { + return tagsManager.getNodeCardinalityByOp(nodeId, tags, op); } @Override - public boolean allocationTagExistsOnNode(NodeId nodeId, - ApplicationId applicationId, String tag) - throws InvalidAllocationTagsQueryException { - return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag); + public long getRackCardinality(String rack, ApplicationId applicationId, + String tag) throws InvalidAllocationTagsQueryException { + return tagsManager.getRackCardinality(rack, applicationId, tag); } @Override - public long getNodeCardinalityByOp(NodeId nodeId, - ApplicationId applicationId, Set<String> tags, LongBinaryOperator op) - throws InvalidAllocationTagsQueryException { - return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op); + public long getRackCardinalityByOp(String rack, AllocationTags tags, + LongBinaryOperator op) throws InvalidAllocationTagsQueryException { + return tagsManager.getRackCardinalityByOp(rack, tags, op); } @Override - public long getRackCardinalityByOp(String rack, ApplicationId applicationId, - Set<String> tags, LongBinaryOperator op) + public boolean allocationTagExistsOnNode(NodeId nodeId, + ApplicationId applicationId, String tag) throws InvalidAllocationTagsQueryException { - return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op); + return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 7e5506e..9004110 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -23,7 +23,7 @@ import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace; import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; -import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -339,18 +338,18 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode> try { AllocationTagNamespace tagNS = AllocationTagNamespace.parse(targetExpression.getTargetKey()); - if (!AllocationTagNamespaceType.SELF + if (AllocationTagNamespaceType.APP_LABEL .equals(tagNS.getNamespaceType())) { throwExceptionWithMetaInfo( - "As of now, the only accepted target key for targetKey of " - + "allocation_tag target expression is: [" - + AllocationTagNamespaceType.SELF.toString() - + "]. Please make changes to placement constraints " - + "accordingly. If this is null, it will be set to " + "As of now, allocation tag namespace [" + + AllocationTagNamespaceType.APP_LABEL.toString() + + "] is not supported. Please make changes to placement " + + "constraints accordingly. If this is null, it will be " + + "set to " + AllocationTagNamespaceType.SELF.toString() + " by default."); } - } catch (InvalidAllocationTagException e) { + } catch (InvalidAllocationTagsQueryException e) { throwExceptionWithMetaInfo( "Invalid allocation tag namespace, message: " + e.getMessage()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 2ed201c..5eb667e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -305,6 +305,14 @@ public class MockAM { public AllocateResponse allocateIntraAppAntiAffinity( ResourceSizing resourceSizing, Priority priority, long allocationId, Set<String> allocationTags, String... targetTags) throws Exception { + return allocateAppAntiAffinity(resourceSizing, priority, allocationId, + null, allocationTags, targetTags); + } + + public AllocateResponse allocateAppAntiAffinity( + ResourceSizing resourceSizing, Priority priority, long allocationId, + String namespace, Set<String> allocationTags, String... targetTags) + throws Exception { return this.allocate(null, Arrays.asList(SchedulingRequest.newBuilder().executionType( ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) @@ -313,7 +321,8 @@ public class MockAM { PlacementConstraints .targetNotIn(PlacementConstraints.NODE, PlacementConstraints.PlacementTargets - .allocationTagToIntraApp(targetTags)).build()) + .allocationTagWithNamespace(namespace, targetTags)) + .build()) .resourceSizing(resourceSizing).build()), null); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d199274b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 27c5fbd..7a930cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -428,20 +430,27 @@ public class TestRMContainerImpl { rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags( + TestUtils.getMockApplicationId(1), null), + Long::max)); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); Assert.assertEquals(1, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus .newInstance(containerId, ContainerState.COMPLETE, "", 0), RMContainerEventType.KILL)); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); /* Second container: ACQUIRED -> FINISHED */ rmContainer = new RMContainerImpl(container, @@ -449,14 +458,18 @@ public class TestRMContainerImpl { nodeId, "user", rmContext); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.setAllocationTags(ImmutableSet.of("mapper")); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); Assert.assertEquals(1, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.handle( new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); @@ -466,7 +479,9 @@ public class TestRMContainerImpl { RMContainerEventType.FINISHED)); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); /* Third container: RUNNING -> FINISHED */ rmContainer = new RMContainerImpl(container, @@ -475,13 +490,17 @@ public class TestRMContainerImpl { rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.handle(new RMContainerEvent(containerId, RMContainerEventType.START)); Assert.assertEquals(1, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); rmContainer.handle( new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED)); @@ -494,7 +513,9 @@ public class TestRMContainerImpl { RMContainerEventType.FINISHED)); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); /* Fourth container: NEW -> RECOVERED */ rmContainer = new RMContainerImpl(container, @@ -503,7 +524,9 @@ public class TestRMContainerImpl { rmContainer.setAllocationTags(ImmutableSet.of("mapper")); Assert.assertEquals(0, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); NMContainerStatus containerStatus = NMContainerStatus .newInstance(containerId, 0, ContainerState.NEW, @@ -514,6 +537,8 @@ public class TestRMContainerImpl { .handle(new RMContainerRecoverEvent(containerId, containerStatus)); Assert.assertEquals(1, - tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max)); + tagsManager.getNodeCardinalityByOp(nodeId, + AllocationTags.createSingleAppAllocationTags(appId, null), + Long::max)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org