This is an automated email from the ASF dual-hosted git repository. slfan1989 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new df0381f2367 YARN-11536. [Federation] Router CLI Supports Batch Save the SubClusterPolicyConfiguration Of Queues. (#5862) Contributed by Shilun Fan. df0381f2367 is described below commit df0381f236789c6fc63a3dbbd50ce3b159fa2ff3 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Thu Aug 10 10:27:36 2023 +0800 YARN-11536. [Federation] Router CLI Supports Batch Save the SubClusterPolicyConfiguration Of Queues. (#5862) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri <inigo...@apache.org> Signed-off-by: Shilun Fan <slfan1...@apache.org> --- .../api/ResourceManagerAdministrationProtocol.java | 16 +- .../BatchSaveFederationQueuePoliciesRequest.java | 53 +++++ .../BatchSaveFederationQueuePoliciesResponse.java | 47 ++++ .../api/protocolrecords/FederationQueueWeight.java | 41 ++++ .../resourcemanager_administration_protocol.proto | 1 + ...arn_server_resourcemanager_service_protos.proto | 8 + .../src/main/proto/yarn_protos.proto | 2 + .../apache/hadoop/yarn/client/cli/RouterCLI.java | 251 ++++++++++++++++++++- .../hadoop/yarn/client/util/MemoryPageUtils.java | 58 +++++ .../hadoop/yarn/client/TestMemoryPageUtils.java | 59 +++++ .../hadoop/yarn/client/cli/TestRouterCLI.java | 23 ++ .../src/test/resources/federation-weights.xml | 74 ++++++ ...eManagerAdministrationProtocolPBClientImpl.java | 19 ++ ...ManagerAdministrationProtocolPBServiceImpl.java | 23 ++ ...chSaveFederationQueuePoliciesRequestPBImpl.java | 149 ++++++++++++ ...hSaveFederationQueuePoliciesResponsePBImpl.java | 99 ++++++++ .../impl/pb/FederationQueueWeightPBImpl.java | 40 ++++ .../yarn/server/MockResourceManagerFacade.java | 8 + .../yarn/server/resourcemanager/AdminService.java | 34 ++- .../hadoop/yarn/server/router/RouterMetrics.java | 34 ++- .../rmadmin/DefaultRMAdminRequestInterceptor.java | 8 + .../rmadmin/FederationRMAdminInterceptor.java | 99 +++++++- .../router/rmadmin/RouterRMAdminService.java | 9 + .../yarn/server/router/TestRouterMetrics.java | 36 +++ .../PassThroughRMAdminRequestInterceptor.java | 8 + .../rmadmin/TestFederationRMAdminInterceptor.java | 144 ++++++++++++ 26 files changed, 1330 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index a4967960a03..1ad77e0b30e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -60,7 +60,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; - +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; @Private public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol { @@ -189,4 +190,17 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr @Idempotent SaveFederationQueuePolicyResponse saveFederationQueuePolicy( SaveFederationQueuePolicyRequest request) throws YarnException, IOException; + + /** + * In YARN-Federation mode, this method provides a way to save queue policies in batches. + * + * @param request BatchSaveFederationQueuePolicies Request + * @return Response from batchSaveFederationQueuePolicies. + * @throws YarnException exceptions from yarn servers. + * @throws IOException if an IO error occurred. + */ + @Private + @Idempotent + BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( + BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesRequest.java new file mode 100644 index 00000000000..c71c61aca9f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +/** + * In Federation mode, + * we will support batch save queues policies to FederationStateStore. + */ +@Private +@Unstable +public abstract class BatchSaveFederationQueuePoliciesRequest { + + @Private + @Unstable + public static BatchSaveFederationQueuePoliciesRequest newInstance( + List<FederationQueueWeight> federationQueueWeights) { + BatchSaveFederationQueuePoliciesRequest request = + Records.newRecord(BatchSaveFederationQueuePoliciesRequest.class); + request.setFederationQueueWeights(federationQueueWeights); + return request; + } + + @Public + @Unstable + public abstract List<FederationQueueWeight> getFederationQueueWeights(); + + @Private + @Unstable + public abstract void setFederationQueueWeights( + List<FederationQueueWeight> federationQueueWeights); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesResponse.java new file mode 100644 index 00000000000..b9d1e69c4a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesResponse.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +@Private +@Unstable +public abstract class BatchSaveFederationQueuePoliciesResponse { + + public static BatchSaveFederationQueuePoliciesResponse newInstance() { + return Records.newRecord(BatchSaveFederationQueuePoliciesResponse.class); + } + + public static BatchSaveFederationQueuePoliciesResponse newInstance(String msg) { + BatchSaveFederationQueuePoliciesResponse response = + Records.newRecord(BatchSaveFederationQueuePoliciesResponse.class); + response.setMessage(msg); + return response; + } + + @Public + @Unstable + public abstract String getMessage(); + + @Public + @Unstable + public abstract void setMessage(String msg); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java index c63ee1b713d..aa9d1145274 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java @@ -75,6 +75,19 @@ public abstract class FederationQueueWeight { return federationQueueWeight; } + @Private + @Unstable + public static FederationQueueWeight newInstance(String routerWeight, + String amrmWeight, String headRoomAlpha, String queue, String policyManagerClassName) { + FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class); + federationQueueWeight.setRouterWeight(routerWeight); + federationQueueWeight.setAmrmWeight(amrmWeight); + federationQueueWeight.setHeadRoomAlpha(headRoomAlpha); + federationQueueWeight.setQueue(queue); + federationQueueWeight.setPolicyManagerClassName(policyManagerClassName); + return federationQueueWeight; + } + @Public @Unstable public abstract String getRouterWeight(); @@ -166,4 +179,32 @@ public abstract class FederationQueueWeight { protected static boolean isNumeric(String value) { return NumberUtils.isCreatable(value); } + + @Public + @Unstable + public abstract String getQueue(); + + @Public + @Unstable + public abstract void setQueue(String queue); + + @Public + @Unstable + public abstract String getPolicyManagerClassName(); + + @Public + @Unstable + public abstract void setPolicyManagerClassName(String policyManagerClassName); + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("FederationQueueWeight { "); + builder.append("Queue: ").append(getQueue()).append(", "); + builder.append("RouterWeight: ").append(getRouterWeight()).append(", "); + builder.append("AmrmWeight: ").append(getAmrmWeight()).append(", "); + builder.append("PolicyManagerClassName: ").append(getPolicyManagerClassName()); + builder.append(" }"); + return builder.toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index 809817fa9f9..aca7a4c0b83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -49,4 +49,5 @@ service ResourceManagerAdministrationProtocolService { rpc mapAttributesToNodes(NodesToAttributesMappingRequestProto) returns (NodesToAttributesMappingResponseProto); rpc deregisterSubCluster(DeregisterSubClusterRequestProto) returns (DeregisterSubClusterResponseProto); rpc saveFederationQueuePolicy(SaveFederationQueuePolicyRequestProto) returns (SaveFederationQueuePolicyResponseProto); + rpc batchSaveFederationQueuePolicies(BatchSaveFederationQueuePoliciesRequestProto) returns (BatchSaveFederationQueuePoliciesResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 4e330fb1e63..06e11f913b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -180,6 +180,14 @@ message SaveFederationQueuePolicyResponseProto { required string message = 1; } +message BatchSaveFederationQueuePoliciesRequestProto { + repeated FederationQueueWeightProto federationQueueWeights = 1; +} + +message BatchSaveFederationQueuePoliciesResponseProto { + required string message = 1; +} + ////////////////////////////////////////////////////////////////// ///////////// RM Failover related records //////////////////////// ////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 847919091cf..71c102f4f85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -444,6 +444,8 @@ message FederationQueueWeightProto { optional string routerWeight = 1; optional string amrmWeight = 2; optional string headRoomAlpha = 3; + optional string queue = 4; + optional string policyManagerClassName = 5; } //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java index 1c5873f28bd..788ef8feb1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java @@ -32,6 +32,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.util.FormattingCLIUtils; +import org.apache.hadoop.yarn.client.util.MemoryPageUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; @@ -40,16 +41,28 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRes import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -71,7 +84,8 @@ public class RouterCLI extends Configured implements Tool { "set the state of the subCluster to SC_LOST.")) // Command2: policy .put("-policy", new UsageInfo( - "[-s|--save [queue;router weight;amrm weight;headroomalpha]]", + "[-s|--save [queue;router weight;amrm weight;headroomalpha]] " + + "[-bs|--batch-save [--format xml] [-f|--input-file fileName]]", "We provide a set of commands for Policy:" + " Include list policies, save policies, batch save policies. " + " (Note: The policy type will be directly read from the" + @@ -102,8 +116,23 @@ public class RouterCLI extends Configured implements Tool { // Command2: policy // save policy private static final String OPTION_S = "s"; + private static final String OPTION_BATCH_S = "bs"; private static final String OPTION_SAVE = "save"; + private static final String OPTION_BATCH_SAVE = "batch-save"; + private static final String OPTION_FORMAT = "format"; + private static final String OPTION_FILE = "f"; + private static final String OPTION_INPUT_FILE = "input-file"; + private static final String CMD_POLICY = "-policy"; + private static final String FORMAT_XML = "xml"; + private static final String FORMAT_JSON = "json"; + private static final String XML_TAG_SUBCLUSTERIDINFO = "subClusterIdInfo"; + private static final String XML_TAG_AMRMPOLICYWEIGHTS = "amrmPolicyWeights"; + private static final String XML_TAG_ROUTERPOLICYWEIGHTS = "routerPolicyWeights"; + private static final String XML_TAG_HEADROOMALPHA = "headroomAlpha"; + private static final String XML_TAG_FEDERATION_WEIGHTS = "federationWeights"; + private static final String XML_TAG_QUEUE = "queue"; + private static final String XML_TAG_NAME = "name"; public RouterCLI() { super(); @@ -161,7 +190,8 @@ public class RouterCLI extends Configured implements Tool { .append("The full syntax is: \n\n") .append("routeradmin\n") .append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]\n") - .append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha]]\n") + .append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha] " + + "[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]]\n") .append(" [-help [cmd]]").append("\n"); StringBuilder helpBuilder = new StringBuilder(); System.out.println(summary); @@ -304,7 +334,23 @@ public class RouterCLI extends Configured implements Tool { "We will save the policy information of the queue, " + "including queue and weight information"); saveOpt.setOptionalArg(true); + Option batchSaveOpt = new Option(OPTION_BATCH_S, OPTION_BATCH_SAVE, false, + "We will save queue policies in bulk, " + + "where users can provide XML or JSON files containing the policies. " + + "This command will parse the file contents and store the results " + + "in the FederationStateStore."); + Option formatOpt = new Option(null, "format", true, + "Users can specify the file format using this option. " + + "Currently, there are one supported file formats: XML." + + "These files contain the policy information for storing queue policies."); + Option fileOpt = new Option("f", "input-file", true, + "The location of the input configuration file. "); + formatOpt.setOptionalArg(true); + opts.addOption(saveOpt); + opts.addOption(batchSaveOpt); + opts.addOption(formatOpt); + opts.addOption(fileOpt); // Parse command line arguments. CommandLine cliParser; @@ -317,12 +363,42 @@ public class RouterCLI extends Configured implements Tool { } // Try to parse the cmd save. + // Save a single queue policy if (cliParser.hasOption(OPTION_S) || cliParser.hasOption(OPTION_SAVE)) { String policy = cliParser.getOptionValue(OPTION_S); if (StringUtils.isBlank(policy)) { policy = cliParser.getOptionValue(OPTION_SAVE); } return handleSavePolicy(policy); + } else if (cliParser.hasOption(OPTION_BATCH_S) || cliParser.hasOption(OPTION_BATCH_SAVE)) { + // Save Queue Policies in Batches + // Determine whether the file format is accurate, XML or JSON format. + // If it is not XML or JSON, we will directly prompt the user with an error message. + String format = null; + if (cliParser.hasOption(OPTION_FORMAT)) { + format = cliParser.getOptionValue(OPTION_FORMAT); + if (StringUtils.isBlank(format) || + !StringUtils.equalsAnyIgnoreCase(format, FORMAT_XML)) { + System.out.println("We currently only support policy configuration files " + + "in XML formats."); + return EXIT_ERROR; + } + } + + // Parse configuration file path. + String filePath = null; + if (cliParser.hasOption(OPTION_FILE) || cliParser.hasOption(OPTION_INPUT_FILE)) { + filePath = cliParser.getOptionValue(OPTION_FILE); + if (StringUtils.isBlank(filePath)) { + filePath = cliParser.getOptionValue(OPTION_INPUT_FILE); + } + } + + // Batch SavePolicies. + return handBatchSavePolicies(format, filePath); + } else { + // printUsage + printUsage(args[0]); } return EXIT_ERROR; @@ -342,6 +418,30 @@ public class RouterCLI extends Configured implements Tool { } } + private int handBatchSavePolicies(String format, String policyFile) { + + if(StringUtils.isBlank(format)) { + LOG.error("Batch Save Federation Policies. Format is Empty."); + return EXIT_ERROR; + } + + if(StringUtils.isBlank(policyFile)) { + LOG.error("Batch Save Federation Policies. policyFile is Empty."); + return EXIT_ERROR; + } + + LOG.info("Batch Save Federation Policies. Format = {}, PolicyFile = {}.", + format, policyFile); + + switch (format) { + case FORMAT_XML: + return parseXml2PoliciesAndBatchSavePolicies(policyFile); + default: + System.out.println("We currently only support XML formats."); + return EXIT_ERROR; + } + } + /** * We will parse the policy, and it has specific formatting requirements. * @@ -384,6 +484,140 @@ public class RouterCLI extends Configured implements Tool { return request; } + /** + * Parse Policies from XML and save them in batches to FederationStateStore. + * + * We save 20 policies in one batch. + * If the user needs to save 1000 policies, it will cycle 50 times. + * + * Every time a page is saved, we will print whether a page + * has been saved successfully or failed. + * + * @param policiesXml Policies Xml Path. + * @return 0, success; 1, failed. + */ + protected int parseXml2PoliciesAndBatchSavePolicies(String policiesXml) { + try { + List<FederationQueueWeight> federationQueueWeightsList = parsePoliciesByXml(policiesXml); + MemoryPageUtils<FederationQueueWeight> memoryPageUtils = new MemoryPageUtils<>(20); + federationQueueWeightsList.forEach(federationQueueWeight -> + memoryPageUtils.addToMemory(federationQueueWeight)); + int pages = memoryPageUtils.getPages(); + for (int i = 0; i < pages; i++) { + List<FederationQueueWeight> federationQueueWeights = + memoryPageUtils.readFromMemory(i); + BatchSaveFederationQueuePoliciesRequest request = + BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights); + ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); + BatchSaveFederationQueuePoliciesResponse response = + adminProtocol.batchSaveFederationQueuePolicies(request); + System.out.println("page <" + (i + 1) + "> : " + response.getMessage()); + } + } catch (Exception e) { + LOG.error("BatchSaveFederationQueuePolicies error", e); + } + return EXIT_ERROR; + } + + /** + * Parse FederationQueueWeight from the xml configuration file. + * <p> + * We allow users to provide an xml configuration file, + * which stores the weight information of the queue. + * + * @param policiesXml Policies Xml Path. + * @return FederationQueueWeight List. + * @throws IOException an I/O exception of some sort has occurred. + * @throws SAXException Encapsulate a general SAX error or warning. + * @throws ParserConfigurationException a serious configuration error.. + */ + protected List<FederationQueueWeight> parsePoliciesByXml(String policiesXml) + throws IOException, SAXException, ParserConfigurationException { + + List<FederationQueueWeight> weights = new ArrayList<>(); + + File xmlFile = new File(policiesXml); + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = factory.newDocumentBuilder(); + Document document = builder.parse(xmlFile); + + NodeList federationsList = document.getElementsByTagName(XML_TAG_FEDERATION_WEIGHTS); + + for (int i = 0; i < federationsList.getLength(); i++) { + + Node federationNode = federationsList.item(i); + + if (federationNode.getNodeType() == Node.ELEMENT_NODE) { + Element federationElement = (Element) federationNode; + NodeList queueList = federationElement.getElementsByTagName(XML_TAG_QUEUE); + + for (int j = 0; j < queueList.getLength(); j++) { + + Node queueNode = queueList.item(j); + if (queueNode.getNodeType() == Node.ELEMENT_NODE) { + Element queueElement = (Element) queueNode; + // parse queueName. + String queueName = queueElement.getElementsByTagName(XML_TAG_NAME) + .item(0).getTextContent(); + + // parse amrmPolicyWeights / routerPolicyWeights. + String amrmWeight = parsePolicyWeightsNode(queueElement, XML_TAG_AMRMPOLICYWEIGHTS); + String routerWeight = parsePolicyWeightsNode(queueElement, XML_TAG_ROUTERPOLICYWEIGHTS); + + // parse headroomAlpha. + String headroomAlpha = queueElement.getElementsByTagName(XML_TAG_HEADROOMALPHA) + .item(0).getTextContent(); + + String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER, + YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER); + + LOG.debug("Queue: {}, AmrmPolicyWeights: {}, RouterWeight: {}, HeadroomAlpha: {}.", + queueName, amrmWeight, routerWeight, headroomAlpha); + + FederationQueueWeight weight = FederationQueueWeight.newInstance(routerWeight, + amrmWeight, headroomAlpha, queueName, policyManager); + + weights.add(weight); + } + } + } + } + + return weights; + } + + /** + * We will parse the policyWeight information. + * + * @param queueElement xml Element. + * @param weightType weightType, including 2 types, AmrmPolicyWeight and RouterPolicyWeight. + * @return concatenated string of sub-cluster weights. + */ + private String parsePolicyWeightsNode(Element queueElement, String weightType) { + NodeList amrmPolicyWeightsList = queueElement.getElementsByTagName(weightType); + Node amrmPolicyWeightsNode = amrmPolicyWeightsList.item(0); + List<String> amRmPolicyWeights = new ArrayList<>(); + if (amrmPolicyWeightsNode.getNodeType() == Node.ELEMENT_NODE) { + Element amrmPolicyWeightsElement = (Element) amrmPolicyWeightsNode; + NodeList subClusterIdInfoList = + amrmPolicyWeightsElement.getElementsByTagName(XML_TAG_SUBCLUSTERIDINFO); + for (int i = 0; i < subClusterIdInfoList.getLength(); i++) { + Node subClusterIdInfoNode = subClusterIdInfoList.item(i); + if (subClusterIdInfoNode.getNodeType() == Node.ELEMENT_NODE) { + Element subClusterIdInfoElement = (Element) subClusterIdInfoNode; + String subClusterId = + subClusterIdInfoElement.getElementsByTagName("id").item(0).getTextContent(); + String weight = + subClusterIdInfoElement.getElementsByTagName("weight").item(0).getTextContent(); + LOG.debug("WeightType[{}] - SubCluster ID: {}, Weight: {}.", + weightType, subClusterId, weight); + amRmPolicyWeights.add(subClusterId + ":" + weight); + } + } + } + return StringUtils.join(amRmPolicyWeights, ","); + } + @Override public int run(String[] args) throws Exception { YarnConfiguration yarnConf = getConf() == null ? @@ -405,14 +639,13 @@ public class RouterCLI extends Configured implements Tool { printHelp(); } return EXIT_SUCCESS; - } - - if (CMD_DEREGISTERSUBCLUSTER.equals(cmd)) { + } else if (CMD_DEREGISTERSUBCLUSTER.equals(cmd)) { return handleDeregisterSubCluster(args); - } - - if (CMD_POLICY.equals(cmd)) { + } else if (CMD_POLICY.equals(cmd)) { return handlePolicy(args); + } else { + System.out.println("No related commands found."); + printHelp(); } return EXIT_SUCCESS; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/MemoryPageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/MemoryPageUtils.java new file mode 100644 index 00000000000..24e688779ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/MemoryPageUtils.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.util; + +import java.util.ArrayList; +import java.util.List; + +/** + * This is a memory paging utility that is used to paginate a dataset. + * + * This class is designed to support batch entry queue policies. + */ +public class MemoryPageUtils<T> { + private List<T> dataList; + private int pageSize; + + /** + * MemoryPageUtils constructor. + * + * @param pageSize Number of records returned per page. + */ + public MemoryPageUtils(int pageSize) { + this.pageSize = pageSize; + this.dataList = new ArrayList<>(); + } + + public void addToMemory(T data) { + dataList.add(data); + } + + public List<T> readFromMemory(int pageNumber) { + int startIndex = pageNumber * pageSize; + int endIndex = Math.min(startIndex + pageSize, dataList.size()); + if (startIndex >= dataList.size()) { + return null; + } + return dataList.subList(startIndex, endIndex); + } + + public int getPages() { + return (dataList.size() / pageSize + 1); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMemoryPageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMemoryPageUtils.java new file mode 100644 index 00000000000..4783c73d669 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMemoryPageUtils.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client; + +import org.apache.hadoop.yarn.client.util.MemoryPageUtils; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * The purpose of this class is to test + * whether the memory paging function is as expected. + */ +public class TestMemoryPageUtils { + + @Test + public void testMemoryPage() { + // We design such a unit test for testing pagination, and we prepare 6 pieces of policy data. + // If 1 page is followed by 5 pieces of data, we will get 2 pages. + // Page 1 will contain 5 records and page 2 will contain 1 record. + MemoryPageUtils<String> policies = new MemoryPageUtils<>(5); + policies.addToMemory("policy-1"); + policies.addToMemory("policy-2"); + policies.addToMemory("policy-3"); + policies.addToMemory("policy-4"); + policies.addToMemory("policy-5"); + policies.addToMemory("policy-6"); + + // Page 1 will return 5 records. + List<String> firstPage = policies.readFromMemory(0); + assertEquals(5, firstPage.size()); + + // Page 2 will return 1 records + List<String> secondPage = policies.readFromMemory(1); + assertEquals(1, secondPage.size()); + + // Page 10, This is a wrong number of pages, we will get null. + List<String> tenPage = policies.readFromMemory(10); + assertNull(tenPage); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java index c7989e11af5..476ba75263d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java @@ -217,4 +217,27 @@ public class TestRouterCLI { args = new String[]{"-policy", "-save", "root.a;SC-1:0.1,SC-2:0.9;SC-1:0.7,SC-2:0.3;1.0"}; assertEquals(0, rmAdminCLI.run(args)); } + + @Test + public void testParsePoliciesByXml() throws Exception { + String filePath = + TestRouterCLI.class.getClassLoader().getResource("federation-weights.xml").getFile(); + List<FederationQueueWeight> federationQueueWeights = rmAdminCLI.parsePoliciesByXml(filePath); + assertNotNull(federationQueueWeights); + assertEquals(2, federationQueueWeights.size()); + + // Queue1: root.a + FederationQueueWeight queueWeight1 = federationQueueWeights.get(0); + assertNotNull(queueWeight1); + assertEquals("root.a", queueWeight1.getQueue()); + assertEquals("SC-1:0.7,SC-2:0.3", queueWeight1.getAmrmWeight()); + assertEquals("SC-1:0.6,SC-2:0.4", queueWeight1.getRouterWeight()); + + // Queue2: root.b + FederationQueueWeight queueWeight2 = federationQueueWeights.get(1); + assertNotNull(queueWeight2); + assertEquals("root.b", queueWeight2.getQueue()); + assertEquals("SC-1:0.8,SC-2:0.2", queueWeight2.getAmrmWeight()); + assertEquals("SC-1:0.6,SC-2:0.4", queueWeight2.getRouterWeight()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/federation-weights.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/federation-weights.xml new file mode 100644 index 00000000000..d5fceaf35fc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/federation-weights.xml @@ -0,0 +1,74 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<federationWeights> + <weight> + <queue> + <name>root.a</name> + <amrmPolicyWeights> + <subClusterIdInfo> + <id>SC-1</id> + <weight>0.7</weight> + </subClusterIdInfo> + <subClusterIdInfo> + <id>SC-2</id> + <weight>0.3</weight> + </subClusterIdInfo> + </amrmPolicyWeights> + <routerPolicyWeights> + <subClusterIdInfo> + <id>SC-1</id> + <weight>0.6</weight> + </subClusterIdInfo> + <subClusterIdInfo> + <id>SC-2</id> + <weight>0.4</weight> + </subClusterIdInfo> + </routerPolicyWeights> + <headroomAlpha>1.0</headroomAlpha> + </queue> + </weight> + <weight> + <queue> + <name>root.b</name> + <amrmPolicyWeights> + <subClusterIdInfo> + <id>SC-1</id> + <weight>0.8</weight> + </subClusterIdInfo> + <subClusterIdInfo> + <id>SC-2</id> + <weight>0.2</weight> + </subClusterIdInfo> + </amrmPolicyWeights> + <routerPolicyWeights> + <subClusterIdInfo> + <id>SC-1</id> + <weight>0.6</weight> + </subClusterIdInfo> + <subClusterIdInfo> + <id>SC-2</id> + <weight>0.4</weight> + </subClusterIdInfo> + </routerPolicyWeights> + <headroomAlpha>1.0</headroomAlpha> + </queue> + </weight> +</federationWeights> \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index 3242fe1400b..7107f8014e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Repla import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; @@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; @@ -113,6 +116,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubCl import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesResponsePBImpl; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -381,4 +386,18 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour return null; } } + + @Override + public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( + BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { + BatchSaveFederationQueuePoliciesRequestProto requestProto = + ((BatchSaveFederationQueuePoliciesRequestPBImpl) request).getProto(); + try { + return new BatchSaveFederationQueuePoliciesResponsePBImpl( + proxy.batchSaveFederationQueuePolicies(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index e393523ed2e..01feef9d9fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; @@ -79,6 +81,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; @@ -111,6 +115,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubCl import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesResponsePBImpl; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -399,4 +405,21 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou throw new ServiceException(e); } } + + @Override + public BatchSaveFederationQueuePoliciesResponseProto batchSaveFederationQueuePolicies( + RpcController controller, BatchSaveFederationQueuePoliciesRequestProto proto) + throws ServiceException { + BatchSaveFederationQueuePoliciesRequest request = + new BatchSaveFederationQueuePoliciesRequestPBImpl(proto); + try { + BatchSaveFederationQueuePoliciesResponse response = + real.batchSaveFederationQueuePolicies(request); + return ((BatchSaveFederationQueuePoliciesResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesRequestPBImpl.java new file mode 100644 index 00000000000..a52e47372bd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesRequestPBImpl.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; + +import java.util.ArrayList; +import java.util.List; + +/** + * The class is responsible for batch-saving queue policies requests. + */ +@Private +@Unstable +public class BatchSaveFederationQueuePoliciesRequestPBImpl + extends BatchSaveFederationQueuePoliciesRequest { + + private BatchSaveFederationQueuePoliciesRequestProto proto = + BatchSaveFederationQueuePoliciesRequestProto.getDefaultInstance(); + private BatchSaveFederationQueuePoliciesRequestProto.Builder builder = null; + private boolean viaProto = false; + private List<FederationQueueWeight> federationQueueWeights = null; + + public BatchSaveFederationQueuePoliciesRequestPBImpl() { + this.builder = BatchSaveFederationQueuePoliciesRequestProto.newBuilder(); + } + + public BatchSaveFederationQueuePoliciesRequestPBImpl( + BatchSaveFederationQueuePoliciesRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = BatchSaveFederationQueuePoliciesRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + if (this.federationQueueWeights != null) { + for (FederationQueueWeight federationQueueWeight : federationQueueWeights) { + FederationQueueWeightPBImpl federationQueueWeightPBImpl = + (FederationQueueWeightPBImpl) federationQueueWeight; + builder.addFederationQueueWeights(federationQueueWeightPBImpl.getProto()); + } + } + proto = builder.build(); + viaProto = true; + } + + public BatchSaveFederationQueuePoliciesRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void initDeregisterSubClustersMapping() { + if (this.federationQueueWeights != null) { + return; + } + + BatchSaveFederationQueuePoliciesRequestProtoOrBuilder p = viaProto ? proto : builder; + List<FederationQueueWeightProto> batchSaveFederationQueuePoliciesProtoList = + p.getFederationQueueWeightsList(); + + List<FederationQueueWeight> attributes = new ArrayList<>(); + if (batchSaveFederationQueuePoliciesProtoList == null || + batchSaveFederationQueuePoliciesProtoList.size() == 0) { + this.federationQueueWeights = attributes; + return; + } + + for (FederationQueueWeightProto federationQueueWeightProto : + batchSaveFederationQueuePoliciesProtoList) { + attributes.add(new FederationQueueWeightPBImpl(federationQueueWeightProto)); + } + + this.federationQueueWeights = attributes; + } + + @Override + public List<FederationQueueWeight> getFederationQueueWeights() { + initDeregisterSubClustersMapping(); + return this.federationQueueWeights; + } + + @Override + public void setFederationQueueWeights(List<FederationQueueWeight> pFederationQueueWeights) { + if (federationQueueWeights == null) { + federationQueueWeights = new ArrayList<>(); + } + if(federationQueueWeights == null) { + throw new IllegalArgumentException("federationQueueWeights cannot be null"); + } + federationQueueWeights.clear(); + federationQueueWeights.addAll(pFederationQueueWeights); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof BatchSaveFederationQueuePoliciesRequest)) { + return false; + } + + BatchSaveFederationQueuePoliciesRequestPBImpl otherImpl = this.getClass().cast(other); + return new EqualsBuilder() + .append(this.getProto(), otherImpl.getProto()) + .isEquals(); + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesResponsePBImpl.java new file mode 100644 index 00000000000..80653d4177b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesResponsePBImpl.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; + +/** + * The class is responsible for batch-saving queue policies responses. + */ +public class BatchSaveFederationQueuePoliciesResponsePBImpl + extends BatchSaveFederationQueuePoliciesResponse { + + private BatchSaveFederationQueuePoliciesResponseProto proto = + BatchSaveFederationQueuePoliciesResponseProto.getDefaultInstance(); + private BatchSaveFederationQueuePoliciesResponseProto.Builder builder = null; + private boolean viaProto = false; + + public BatchSaveFederationQueuePoliciesResponsePBImpl() { + builder = BatchSaveFederationQueuePoliciesResponseProto.newBuilder(); + } + + public BatchSaveFederationQueuePoliciesResponsePBImpl( + BatchSaveFederationQueuePoliciesResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + public BatchSaveFederationQueuePoliciesResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getMessage() { + BatchSaveFederationQueuePoliciesResponseProtoOrBuilder p = viaProto ? proto : builder; + boolean hasMessage = p.hasMessage(); + if (hasMessage) { + return p.getMessage(); + } + return null; + } + + @Override + public void setMessage(String msg) { + maybeInitBuilder(); + if (msg == null) { + builder.clearMessage(); + return; + } + builder.setMessage(msg); + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = BatchSaveFederationQueuePoliciesResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java index 4ca7f783bd9..0c9aa711ba7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java @@ -114,6 +114,46 @@ public class FederationQueueWeightPBImpl extends FederationQueueWeight { builder.setHeadRoomAlpha(headRoomAlpha); } + @Override + public String getQueue() { + FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder; + boolean hasQueue = p.hasQueue(); + if (hasQueue) { + return p.getQueue(); + } + return null; + } + + @Override + public void setQueue(String queue) { + maybeInitBuilder(); + if (queue == null) { + builder.clearQueue(); + return; + } + builder.setQueue(queue); + } + + @Override + public String getPolicyManagerClassName() { + FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder; + boolean hasPolicyManagerClassName = p.hasPolicyManagerClassName(); + if (hasPolicyManagerClassName) { + return p.getPolicyManagerClassName(); + } + return null; + } + + @Override + public void setPolicyManagerClassName(String policyManagerClassName) { + maybeInitBuilder(); + if (policyManagerClassName == null) { + builder.clearPolicyManagerClassName(); + return; + } + builder.setPolicyManagerClassName(policyManagerClassName); + } + @Override public int hashCode() { return getProto().hashCode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 14bb941c9d6..0fc52898434 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -177,6 +177,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; /** @@ -972,6 +974,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, return null; } + @Override + public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( + BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { + return null; + } + @VisibleForTesting public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() { return applicationContainerIdMap; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index d9eade8fe2d..7f1c7b93e94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -99,6 +99,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; @@ -1056,11 +1058,41 @@ public class AdminService extends CompositeService implements "Please call Router's deregisterSubCluster to set."); } + /** + * In YARN-Federation mode, We will be storing the Policy information for Queues. + * + * RM does not support saveFederationQueuePolicy, + * saveFederationQueuePolicy is supported by Router. + * + * @param request saveFederationQueuePolicy Request + * @return Response from saveFederationQueuePolicy. + * @throws YarnException exceptions from yarn servers. + * @throws IOException if an IO error occurred. + */ @Override public SaveFederationQueuePolicyResponse saveFederationQueuePolicy( SaveFederationQueuePolicyRequest request) throws YarnException, IOException { throw new YarnException("It is not allowed to call the RM's saveFederationQueuePolicy. " + - " Please call Router's deregisterSubCluster to set Policy."); + " Please call Router's saveFederationQueuePolicy to set Policy."); + } + + /** + * In YARN-Federation mode, this method provides a way to save queue policies in batches. + * + * RM does not support batchSaveFederationQueuePolicies, + * batchSaveFederationQueuePolicies is supported by Router. + * + * @param request BatchSaveFederationQueuePolicies Request + * @return Response from batchSaveFederationQueuePolicies. + * @throws YarnException exceptions from yarn servers. + * @throws IOException if an IO error occurred. + */ + @Override + public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( + BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { + throw new YarnException("It is not allowed to call the RM's " + + " batchSaveFederationQueuePolicies. " + + " Please call Router's batchSaveFederationQueuePolicies to set Policies."); } private void validateAttributesExists( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index f9769f7026c..6503765ede1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -151,6 +151,8 @@ public final class RouterMetrics { private MutableGaugeInt numDeregisterSubClusterFailedRetrieved; @Metric("# of saveFederationQueuePolicy failed to be retrieved") private MutableGaugeInt numSaveFederationQueuePolicyFailedRetrieved; + @Metric("# of batchSaveFederationQueuePolicies failed to be retrieved") + private MutableGaugeInt numBatchSaveFederationQueuePoliciesFailedRetrieved; @Metric("# of refreshAdminAcls failed to be retrieved") private MutableGaugeInt numRefreshAdminAclsFailedRetrieved; @Metric("# of refreshServiceAcls failed to be retrieved") @@ -299,6 +301,8 @@ public final class RouterMetrics { private MutableRate totalSucceededDeregisterSubClusterRetrieved; @Metric("Total number of successful Retrieved SaveFederationQueuePolicy and latency(ms)") private MutableRate totalSucceededSaveFederationQueuePolicyRetrieved; + @Metric("Total number of successful Retrieved BatchSaveFederationQueuePolicies and latency(ms)") + private MutableRate totalSucceededBatchSaveFederationQueuePoliciesRetrieved; @Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)") private MutableRate totalSucceededRefreshAdminAclsRetrieved; @Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)") @@ -386,6 +390,7 @@ public final class RouterMetrics { private MutableQuantiles refreshUserToGroupsMappingsLatency; private MutableQuantiles refreshDeregisterSubClusterLatency; private MutableQuantiles saveFederationQueuePolicyLatency; + private MutableQuantiles batchSaveFederationQueuePoliciesLatency; private MutableQuantiles refreshAdminAclsLatency; private MutableQuantiles refreshServiceAclsLatency; private MutableQuantiles replaceLabelsOnNodesLatency; @@ -598,7 +603,11 @@ public final class RouterMetrics { "latency of deregister subcluster timeouts", "ops", "latency", 10); saveFederationQueuePolicyLatency = registry.newQuantiles("saveFederationQueuePolicyLatency", - "latency of refresh subcluster timeouts", "ops", "latency", 10); + "latency of save federation queue policy timeouts", "ops", "latency", 10); + + batchSaveFederationQueuePoliciesLatency = registry.newQuantiles( + "batchSaveFederationQueuePoliciesLatency", + "latency of batch save federationqueuepolicies timeouts", "ops", "latency", 10); refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency", "latency of refresh admin acls timeouts", "ops", "latency", 10); @@ -934,6 +943,11 @@ public final class RouterMetrics { return totalSucceededSaveFederationQueuePolicyRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededBatchSaveFederationQueuePoliciesRetrieved() { + return totalSucceededBatchSaveFederationQueuePoliciesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededRefreshAdminAclsRetrieved() { return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples(); @@ -1284,6 +1298,11 @@ public final class RouterMetrics { return totalSucceededSaveFederationQueuePolicyRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededBatchSaveFederationQueuePoliciesRetrieved() { + return totalSucceededBatchSaveFederationQueuePoliciesRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededRefreshAdminAclsRetrieved() { return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean(); @@ -1583,6 +1602,10 @@ public final class RouterMetrics { return numSaveFederationQueuePolicyFailedRetrieved.value(); } + public int getBatchSaveFederationQueuePoliciesFailedRetrieved() { + return numBatchSaveFederationQueuePoliciesFailedRetrieved.value(); + } + public int getNumRefreshAdminAclsFailedRetrieved() { return numRefreshAdminAclsFailedRetrieved.value(); } @@ -1940,6 +1963,11 @@ public final class RouterMetrics { saveFederationQueuePolicyLatency.add(duration); } + public void succeededBatchSaveFederationQueuePoliciesRetrieved(long duration) { + totalSucceededBatchSaveFederationQueuePoliciesRetrieved.add(duration); + batchSaveFederationQueuePoliciesLatency.add(duration); + } + public void succeededRefreshAdminAclsRetrieved(long duration) { totalSucceededRefreshAdminAclsRetrieved.add(duration); refreshAdminAclsLatency.add(duration); @@ -2222,6 +2250,10 @@ public final class RouterMetrics { numSaveFederationQueuePolicyFailedRetrieved.incr(); } + public void incrBatchSaveFederationQueuePoliciesFailedRetrieved() { + numBatchSaveFederationQueuePoliciesFailedRetrieved.incr(); + } + public void incrRefreshAdminAclsFailedRetrieved() { numRefreshAdminAclsFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java index 20aa0dda5fd..ed0698b1a81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,4 +219,10 @@ public class DefaultRMAdminRequestInterceptor SaveFederationQueuePolicyRequest request) throws YarnException, IOException { return rmAdminProxy.saveFederationQueuePolicy(request); } + + @Override + public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( + BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { + return rmAdminProxy.batchSaveFederationQueuePolicies(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index 8125d4eb2b9..5c872608342 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters; import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -950,6 +952,100 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep throw new YarnException("Unable to saveFederationQueuePolicy."); } + /** + * Batch Save the Queue Policies for the Federation. + * + * @param request BatchSaveFederationQueuePolicies Request + * @return Response from batchSaveFederationQueuePolicies. + * @throws YarnException exceptions from yarn servers. + * @throws IOException if an IO error occurred. + */ + @Override + public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( + BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { + + // Parameter validation. + if (request == null) { + routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing BatchSaveFederationQueuePoliciesRequest request.", null); + } + + List<FederationQueueWeight> federationQueueWeights = request.getFederationQueueWeights(); + if (federationQueueWeights == null) { + routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing FederationQueueWeights information.", null); + } + + try { + long startTime = clock.getTime(); + for (FederationQueueWeight federationQueueWeight : federationQueueWeights) { + saveFederationQueuePolicy(federationQueueWeight); + } + long stopTime = clock.getTime(); + routerMetrics.succeededBatchSaveFederationQueuePoliciesRetrieved(stopTime - startTime); + return BatchSaveFederationQueuePoliciesResponse.newInstance("batch save policies success."); + } catch (Exception e) { + routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to batchSaveFederationQueuePolicies due to exception. " + e.getMessage()); + } + + routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved(); + throw new YarnException("Unable to batchSaveFederationQueuePolicies."); + } + + /** + * Save FederationQueuePolicy. + * + * @param federationQueueWeight queue weight. + * @throws YarnException exceptions from yarn servers. + */ + private void saveFederationQueuePolicy(FederationQueueWeight federationQueueWeight) + throws YarnException { + + // Step1, Check whether the weight setting of the queue is as expected. + String queue = federationQueueWeight.getQueue(); + String policyManagerClassName = federationQueueWeight.getPolicyManagerClassName(); + + if (StringUtils.isBlank(queue)) { + RouterServerUtil.logAndThrowException("Missing Queue information.", null); + } + + if (StringUtils.isBlank(policyManagerClassName)) { + RouterServerUtil.logAndThrowException("Missing PolicyManagerClassName information.", null); + } + + String amRmWeight = federationQueueWeight.getAmrmWeight(); + FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight); + + String routerWeight = federationQueueWeight.getRouterWeight(); + FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerWeight); + + String headRoomAlpha = federationQueueWeight.getHeadRoomAlpha(); + FederationQueueWeight.checkHeadRoomAlphaValid(headRoomAlpha); + + // Step2, parse amRMPolicyWeights. + Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight); + LOG.debug("amRMPolicyWeights = {}.", amRMPolicyWeights); + + // Step3, parse routerPolicyWeights. + Map<SubClusterIdInfo, Float> routerPolicyWeights = getSubClusterWeightMap(routerWeight); + LOG.debug("routerWeights = {}.", amRMPolicyWeights); + + // Step4, Initialize WeightedPolicyInfo. + WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo(); + weightedPolicyInfo.setHeadroomAlpha(Float.parseFloat(headRoomAlpha)); + weightedPolicyInfo.setAMRMPolicyWeights(amRMPolicyWeights); + weightedPolicyInfo.setRouterPolicyWeights(routerPolicyWeights); + + // Step5, Set SubClusterPolicyConfiguration. + SubClusterPolicyConfiguration policyConfiguration = + SubClusterPolicyConfiguration.newInstance(queue, policyManagerClassName, + weightedPolicyInfo.toByteBuffer()); + federationFacade.setPolicyConfiguration(policyConfiguration); + } + /** * Get the Map of SubClusterWeight. * @@ -961,8 +1057,9 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep * * @param policyWeight policyWeight. * @return Map of SubClusterWeight. + * @throws YarnException exceptions from yarn servers. */ - private Map<SubClusterIdInfo, Float> getSubClusterWeightMap(String policyWeight) + protected Map<SubClusterIdInfo, Float> getSubClusterWeightMap(String policyWeight) throws YarnException { FederationQueueWeight.checkSubClusterQueueWeightRatioValid(policyWeight); Map<SubClusterIdInfo, Float> result = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java index 3c63ee9bd7f..3b760e28f78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -67,6 +67,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider; import org.apache.hadoop.yarn.util.LRUCacheHashMap; @@ -401,4 +403,11 @@ public class RouterRMAdminService extends AbstractService RequestInterceptorChainWrapper pipeline = getInterceptorChain(); return pipeline.getRootInterceptor().saveFederationQueuePolicy(request); } + + @Override + public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( + BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().batchSaveFederationQueuePolicies(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index c53ecdd472e..f62ddb27962 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -628,6 +628,11 @@ public class TestRouterMetrics { LOG.info("Mocked: failed refreshClusterMaxPriority call"); metrics.incrSaveFederationQueuePolicyFailedRetrieved(); } + + public void getBatchSaveFederationQueuePoliciesFailedRetrieved() { + LOG.info("Mocked: failed BatchSaveFederationQueuePolicies call"); + metrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved(); + } } // Records successes for all calls @@ -963,6 +968,12 @@ public class TestRouterMetrics { duration); metrics.succeededSaveFederationQueuePolicyRetrieved(duration); } + + public void getBatchSaveFederationQueuePoliciesRetrieved(long duration) { + LOG.info("Mocked: successful BatchSaveFederationQueuePoliciesRetrieved " + + " call with duration {}", duration); + metrics.succeededBatchSaveFederationQueuePoliciesRetrieved(duration); + } } @Test @@ -2241,4 +2252,29 @@ public class TestRouterMetrics { Assert.assertEquals(225, metrics.getLatencySucceededSaveFederationQueuePolicyRetrieved(), ASSERT_DOUBLE_DELTA); } + + @Test + public void testGetBatchSaveFederationQueuePoliciesFailedRetrieved() { + long totalBadBefore = metrics.getBatchSaveFederationQueuePoliciesFailedRetrieved(); + badSubCluster.getBatchSaveFederationQueuePoliciesFailedRetrieved(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getBatchSaveFederationQueuePoliciesFailedRetrieved()); + } + + @Test + public void testGetBatchSaveFederationQueuePoliciesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededBatchSaveFederationQueuePoliciesRetrieved(); + goodSubCluster.getBatchSaveFederationQueuePoliciesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededBatchSaveFederationQueuePoliciesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededBatchSaveFederationQueuePoliciesRetrieved(), + ASSERT_DOUBLE_DELTA); + goodSubCluster.getBatchSaveFederationQueuePoliciesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededBatchSaveFederationQueuePoliciesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededBatchSaveFederationQueuePoliciesRetrieved(), + ASSERT_DOUBLE_DELTA); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java index da44923128a..263926e728b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; /** * Mock interceptor that does not do anything other than forwarding it to the @@ -169,4 +171,10 @@ public class PassThroughRMAdminRequestInterceptor SaveFederationQueuePolicyRequest request) throws YarnException, IOException { return getNextInterceptor().saveFederationQueuePolicy(request); } + + @Override + public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( + BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { + return getNextInterceptor().batchSaveFederationQueuePolicies(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index 2c4cda2d2a9..6bdf6009a2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.router.rmadmin; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.test.LambdaTestUtils; @@ -59,6 +60,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; @@ -74,6 +77,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; +import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -81,6 +85,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.HashSet; +import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -700,4 +705,143 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { assertNotNull(sc2AMRMWeight); assertEquals(0.4f, sc2AMRMWeight.floatValue(), 0.00001); } + + @Test + public void testBatchSaveFederationQueuePoliciesRequest() throws IOException, YarnException { + + // subClusters + List<String> subClusterLists = new ArrayList<>(); + subClusterLists.add("SC-1"); + subClusterLists.add("SC-2"); + + // generate queue A, queue B, queue C + FederationQueueWeight rootA = generateFederationQueueWeight("root.a", subClusterLists); + FederationQueueWeight rootB = generateFederationQueueWeight("root.b", subClusterLists); + FederationQueueWeight rootC = generateFederationQueueWeight("root.b", subClusterLists); + + List<FederationQueueWeight> federationQueueWeights = new ArrayList<>(); + federationQueueWeights.add(rootA); + federationQueueWeights.add(rootB); + federationQueueWeights.add(rootC); + + // Step1. Save Queue Policies in Batches + BatchSaveFederationQueuePoliciesRequest request = + BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights); + + BatchSaveFederationQueuePoliciesResponse policiesResponse = + interceptor.batchSaveFederationQueuePolicies(request); + + assertNotNull(policiesResponse); + assertNotNull(policiesResponse.getMessage()); + assertEquals("batch save policies success.", policiesResponse.getMessage()); + + // Step2. We query Policy information from FederationStateStore. + FederationStateStoreFacade federationFacade = interceptor.getFederationFacade(); + SubClusterPolicyConfiguration policyConfiguration = + federationFacade.getPolicyConfiguration("root.a"); + assertNotNull(policyConfiguration); + assertEquals("root.a", policyConfiguration.getQueue()); + + ByteBuffer params = policyConfiguration.getParams(); + assertNotNull(params); + WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params); + assertNotNull(weightedPolicyInfo); + Map<SubClusterIdInfo, Float> amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights(); + Map<SubClusterIdInfo, Float> routerPolicyWeights = weightedPolicyInfo.getRouterPolicyWeights(); + + SubClusterIdInfo sc1 = new SubClusterIdInfo("SC-1"); + SubClusterIdInfo sc2 = new SubClusterIdInfo("SC-2"); + + // Check whether the AMRMWeight of SC-1 and SC-2 of root.a meet expectations + FederationQueueWeight queueWeight = federationQueueWeights.get(0); + Map<SubClusterIdInfo, Float> subClusterAmrmWeightMap = + interceptor.getSubClusterWeightMap(queueWeight.getAmrmWeight()); + Float sc1ExpectedAmrmWeightFloat = amrmPolicyWeights.get(sc1); + Float sc1AmrmWeightFloat = subClusterAmrmWeightMap.get(sc1); + assertNotNull(sc1AmrmWeightFloat); + assertEquals(sc1ExpectedAmrmWeightFloat, sc1AmrmWeightFloat, 0.00001); + + Float sc2ExpectedAmrmWeightFloat = amrmPolicyWeights.get(sc2); + Float sc2AmrmWeightFloat = subClusterAmrmWeightMap.get(sc2); + assertNotNull(sc2ExpectedAmrmWeightFloat); + assertEquals(sc2ExpectedAmrmWeightFloat, sc2AmrmWeightFloat, 0.00001); + + // Check whether the RouterPolicyWeight of SC-1 and SC-2 of root.a meet expectations + Map<SubClusterIdInfo, Float> subClusterRouterWeightMap = + interceptor.getSubClusterWeightMap(queueWeight.getRouterWeight()); + Float sc1ExpectedRouterWeightFloat = routerPolicyWeights.get(sc1); + Float sc1RouterWeightFloat = subClusterRouterWeightMap.get(sc1); + assertNotNull(sc1RouterWeightFloat); + assertEquals(sc1ExpectedRouterWeightFloat, sc1RouterWeightFloat, 0.00001); + + Float sc2ExpectedRouterWeightFloat = routerPolicyWeights.get(sc2); + Float sc2RouterWeightFloat = subClusterRouterWeightMap.get(sc2); + assertNotNull(sc2ExpectedRouterWeightFloat); + assertEquals(sc2ExpectedRouterWeightFloat, sc2RouterWeightFloat, 0.00001); + } + + /** + * Generate FederationQueueWeight. + * We will generate the weight information of the queue. + * + * @param queue queue name + * @param pSubClusters subClusters + * @return subCluster FederationQueueWeight + */ + private FederationQueueWeight generateFederationQueueWeight( + String queue, List<String> pSubClusters) { + String routerWeight = generatePolicyWeight(pSubClusters); + String amrmWeight = generatePolicyWeight(pSubClusters); + String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName(); + String headRoomAlpha = "1.0"; + return FederationQueueWeight.newInstance(routerWeight, amrmWeight, headRoomAlpha, + queue, policyTypeName); + } + + /** + * Generating Policy Weight Data. + * + * @param pSubClusters set of sub-clusters. + * @return policy Weight String, like SC-1:0.7,SC-2:0. + */ + private String generatePolicyWeight(List<String> pSubClusters) { + List<String> weights = generateWeights(subClusters.size()); + List<String> subClusterWeight = new ArrayList<>(); + for (int i = 0; i < pSubClusters.size(); i++) { + String subCluster = pSubClusters.get(i); + String weight = weights.get(i); + subClusterWeight.add(subCluster + ":" + weight); + } + return StringUtils.join(subClusterWeight, ","); + } + + /** + * Generate a set of random numbers, and the sum of the numbers is 1. + * + * @param n number of random numbers generated. + * @return a set of random numbers + */ + private List<String> generateWeights(int n) { + List<Float> randomNumbers = new ArrayList<>(); + float total = 0.0f; + + Random random = new Random(); + for (int i = 0; i < n - 1; i++) { + float randNum = random.nextFloat(); + randomNumbers.add(randNum); + total += randNum; + } + + float lastNumber = 1 - total; + randomNumbers.add(lastNumber); + + DecimalFormat decimalFormat = new DecimalFormat("#.##"); + List<String> formattedRandomNumbers = new ArrayList<>(); + for (double number : randomNumbers) { + String formattedNumber = decimalFormat.format(number); + formattedRandomNumbers.add(formattedNumber); + } + + return formattedRandomNumbers; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org