Repository: incubator-impala Updated Branches: refs/heads/master 1e63ff843 -> 8ad6d0331
IMPALA-5920: Remove admission control dependency on YARN RM jar Impala's admission controller relies on the YARN fair-scheduler.xml for configuration. That configuration is loaded using YARN directly (ie. as a library by the frontend). In Hadoop 3, a number of changes were made to the YARN resourcemanager which break Impala. While we eventually want to rethink the admission control configuration (IMPALA-4159), in the meantime we at least should avoid using unsupported YARN APIs. This patch removes the fe dependency on the YARN artifact 'hadoop-yarn-server-resourcemanager' which contains private APIs and isn't meant to be used as a library. A subset of the required code has been added in 'common/yarn-extras', taken from Hadoop 2.6.0 in CDH, e.g. see [1]. The code is added in packages 'org.apache.impala.*' instead of 'org.apache.yarn.*'. Some code could be copied as-is, those files are marked with the comment: //YARNUTIL: VANILLA Files that required some modifications are marked with: //YARNUTIL: MODIFIED Or, if all code except a dummy interface could be added: //YARNUTIL: DUMMY IMPL The goal the 'yarn-extras' is to make Impala's handling of the AC configuration self-sufficient, i.e. it shouldn't matter what version of Hadoop exists. As-is, this was tested and found to work when Hadoop 2.6 is installed. Because the yarn-extras/pom.xml still references hadoop-common, hadoop-yarn-common, and hadoop-yarn-api, additional testing will be required to ensure Impala using yarn-extras works when installed along side Hadoop 3. That testing for Hadoop 3 will be done later. Future changes will make any other changes required for existing code to work when Hadoop 3 is installed. Testing: * Ran existing tests on master. 1: https://www.cloudera.com/documentation/enterprise/release-notes/topics/cm_vd_cdh_package_tarball_512.html Change-Id: I7efdd8ebea298836ca2a82c0a4ae037ac9285bcf Reviewed-on: http://gerrit.cloudera.org:8080/8035 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Matthew Jacobs <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8ad6d033 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8ad6d033 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8ad6d033 Branch: refs/heads/master Commit: 8ad6d03310825418b6aa1d427200b07bc8bdb0bc Parents: 1e63ff8 Author: Matthew Jacobs <[email protected]> Authored: Mon Sep 11 10:53:07 2017 -0700 Committer: Matthew Jacobs <[email protected]> Committed: Tue Sep 19 21:32:07 2017 +0000 ---------------------------------------------------------------------- CMakeLists.txt | 1 + common/yarn-extras/CMakeLists.txt | 20 + common/yarn-extras/README.txt | 5 + common/yarn-extras/pom.xml | 118 ++++ .../resource/ResourceWeights.java | 26 + .../scheduler/fair/AllocationConfiguration.java | 184 +++++++ .../fair/AllocationConfigurationException.java | 40 ++ .../fair/AllocationFileLoaderService.java | 536 +++++++++++++++++++ .../scheduler/fair/FSQueueType.java | 32 ++ .../fair/FairSchedulerConfiguration.java | 77 +++ .../scheduler/fair/QueuePlacementPolicy.java | 181 +++++++ .../scheduler/fair/QueuePlacementRule.java | 367 +++++++++++++ .../scheduler/fair/SchedulingPolicy.java | 28 + .../impala/yarn/server/utils/BuilderUtils.java | 41 ++ fe/CMakeLists.txt | 3 +- fe/pom.xml | 19 +- .../apache/impala/util/RequestPoolService.java | 6 +- .../impala/util/TestRequestPoolService.java | 23 +- 18 files changed, 1690 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index e8a2355..d60487f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -346,6 +346,7 @@ include_directories(SYSTEM ${KUDU_CLIENT_INCLUDE_DIR}) add_subdirectory(common/function-registry) add_subdirectory(common/thrift) add_subdirectory(common/fbs) +add_subdirectory(common/yarn-extras) add_subdirectory(be) add_subdirectory(fe) add_subdirectory(ext-data-source) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/common/yarn-extras/CMakeLists.txt b/common/yarn-extras/CMakeLists.txt new file mode 100644 index 0000000..81144c3 --- /dev/null +++ b/common/yarn-extras/CMakeLists.txt @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_custom_target(yarn-extras ALL + COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh install -DskipTests +) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/README.txt ---------------------------------------------------------------------- diff --git a/common/yarn-extras/README.txt b/common/yarn-extras/README.txt new file mode 100644 index 0000000..543c596 --- /dev/null +++ b/common/yarn-extras/README.txt @@ -0,0 +1,5 @@ +Extra Hadoop classes from Yarn needed by Impala. + +This is necessary because Impala has an admission controller that is configured using the +same configuration as Yarn (i.e. a fair-scheduler.xml). Some Yarn classes are used to +provide user to pool resolution, authorization, and accessing pool configurations. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/pom.xml ---------------------------------------------------------------------- diff --git a/common/yarn-extras/pom.xml b/common/yarn-extras/pom.xml new file mode 100644 index 0000000..8f3ba4d --- /dev/null +++ b/common/yarn-extras/pom.xml @@ -0,0 +1,118 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.impala</groupId> + <artifactId>yarn-extras</artifactId> + <version>0.1-SNAPSHOT</version> + <name>YARN Extras</name> + <description>Extra Hadoop classes from YARN needed by Impala</description> + <packaging>jar</packaging> + <url>.</url> + + <properties> + <hadoop.version>${env.IMPALA_HADOOP_VERSION}</hadoop.version> + </properties> + + <repositories> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + + <repository> + <id>cdh.rcs.releases.repo</id> + <url>https://repository.cloudera.com/content/groups/cdh-releases-rcs</url> + <name>CDH Releases Repository</name> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + + <repository> + <id>cdh.snapshots.repo</id> + <url>https://repository.cloudera.com/content/repositories/snapshots</url> + <name>CDH Snapshots Repository</name> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + + <repository> + <id>cdh.repo</id> + <url>https://repository.cloudera.com/content/groups/cloudera-repos</url> + <name>Cloudera Repositories</name> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + + <repository> + <id>Codehaus repository</id> + <url>http://repository.codehaus.org/</url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.3</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java new file mode 100644 index 0000000..b4a8d67 --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/resource/ResourceWeights.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.impala.yarn.server.resourcemanager.resource; +//YARNUTIL: DUMMY IMPL + +public class ResourceWeights { + + public ResourceWeights(float f) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java new file mode 100644 index 0000000..f304c6d --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.impala.yarn.server.resourcemanager.scheduler.fair; +//YARNUTIL: MODIFIED + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.impala.yarn.server.resourcemanager.resource.ResourceWeights; + +import com.google.common.annotations.VisibleForTesting; + +public class AllocationConfiguration { + private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); + private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); + // Minimum resource allocation for each queue + private final Map<String, Resource> minQueueResources; + // Maximum amount of resources per queue + @VisibleForTesting + final Map<String, Resource> maxQueueResources; + + private final Resource queueMaxResourcesDefault; + + // ACL's for each queue. Only specifies non-default ACL's from configuration. + private final Map<String, Map<QueueACL, AccessControlList>> queueAcls; + + // Policy for mapping apps to queues + @VisibleForTesting + QueuePlacementPolicy placementPolicy; + + //Configured queues in the alloc xml + @VisibleForTesting + Map<FSQueueType, Set<String>> configuredQueues; + + public AllocationConfiguration(Map<String, Resource> minQueueResources, + Map<String, Resource> maxQueueResources, + Map<String, Resource> maxChildQueueResources, + Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps, + Map<String, ResourceWeights> queueWeights, + Map<String, Float> queueMaxAMShares, int userMaxAppsDefault, + int queueMaxAppsDefault, Resource queueMaxResourcesDefault, + float queueMaxAMShareDefault, + Map<String, SchedulingPolicy> schedulingPolicies, + SchedulingPolicy defaultSchedulingPolicy, + Map<String, Long> minSharePreemptionTimeouts, + Map<String, Long> fairSharePreemptionTimeouts, + Map<String, Float> fairSharePreemptionThresholds, + Map<String, Map<QueueACL, AccessControlList>> queueAcls, + QueuePlacementPolicy placementPolicy, + Map<FSQueueType, Set<String>> configuredQueues, + Set<String> nonPreemptableQueues) { + this.minQueueResources = minQueueResources; + this.maxQueueResources = maxQueueResources; + this.queueMaxResourcesDefault = queueMaxResourcesDefault; + this.queueAcls = queueAcls; + this.placementPolicy = placementPolicy; + this.configuredQueues = configuredQueues; + } + + public AllocationConfiguration(Configuration conf) { + minQueueResources = new HashMap<>(); + maxQueueResources = new HashMap<>(); + queueMaxResourcesDefault = Resources.unbounded(); + queueAcls = new HashMap<>(); + configuredQueues = new HashMap<>(); + for (FSQueueType queueType : FSQueueType.values()) { + configuredQueues.put(queueType, new HashSet<String>()); + } + placementPolicy = + QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); + } + + /** + * Get the ACLs associated with this queue. If a given ACL is not explicitly + * configured, include the default value for that ACL. The default for the + * root queue is everybody ("*") and the default for all other queues is + * nobody ("") + */ + public AccessControlList getQueueAcl(String queue, QueueACL operation) { + Map<QueueACL, AccessControlList> queueAcls = this.queueAcls.get(queue); + if (queueAcls != null) { + AccessControlList operationAcl = queueAcls.get(operation); + if (operationAcl != null) { + return operationAcl; + } + } + return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL; + } + + /** + * Get the minimum resource allocation for the given queue. + * + * @param queue the target queue's name + * @return the min allocation on this queue or {@link Resources#none} + * if not set + */ + public Resource getMinResources(String queue) { + Resource minQueueResource = minQueueResources.get(queue); + return (minQueueResource == null) ? Resources.none() : minQueueResource; + } + + /** + * Set the maximum resource allocation for the given queue. + * + * @param queue the target queue + * @param maxResource the maximum resource allocation + */ + void setMaxResources(String queue, Resource maxResource) { + maxQueueResources.put(queue, maxResource); + } + + /** + * Get the maximum resource allocation for the given queue. If the max in not + * set, return the larger of the min and the default max. + * + * @param queue the target queue's name + * @return the max allocation on this queue + */ + public Resource getMaxResources(String queue) { + Resource maxQueueResource = maxQueueResources.get(queue); + if (maxQueueResource == null) { + Resource minQueueResource = minQueueResources.get(queue); + if (minQueueResource != null && + Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(), + minQueueResource, queueMaxResourcesDefault)) { + return minQueueResource; + } else { + return queueMaxResourcesDefault; + } + } else { + return maxQueueResource; + } + } + + public boolean hasAccess(String queueName, QueueACL acl, + UserGroupInformation user) { + int lastPeriodIndex = queueName.length(); + while (lastPeriodIndex != -1) { + String queue = queueName.substring(0, lastPeriodIndex); + if (getQueueAcl(queue, acl).isUserAllowed(user)) { + return true; + } + + lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1); + } + + return false; + } + + public Map<FSQueueType, Set<String>> getConfiguredQueues() { + return configuredQueues; + } + + public QueuePlacementPolicy getPlacementPolicy() { + return placementPolicy; + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java new file mode 100644 index 0000000..1f848fb --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.impala.yarn.server.resourcemanager.scheduler.fair; +//YARNUTIL: VANILLA + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Thrown when the allocation file for {@link QueueManager} is malformed. + */ +@Private +@Unstable +public class AllocationConfigurationException extends Exception { + private static final long serialVersionUID = 4046517047810854249L; + + public AllocationConfigurationException(String message) { + super(message); + } + + public AllocationConfigurationException(String message, Throwable t) { + super(message, t); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java new file mode 100644 index 0000000..ed9943a --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -0,0 +1,536 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.impala.yarn.server.resourcemanager.scheduler.fair; +//YARNUTIL: VANILLA + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.impala.yarn.server.resourcemanager.resource.ResourceWeights; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; +import org.xml.sax.SAXException; + +import com.google.common.base.CharMatcher; +import com.google.common.annotations.VisibleForTesting; + +@Public +@Unstable +public class AllocationFileLoaderService extends AbstractService { + + public static final Log LOG = LogFactory.getLog( + AllocationFileLoaderService.class.getName()); + + /** Time to wait between checks of the allocation file */ + public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000; + + /** + * Time to wait after the allocation has been modified before reloading it + * (this is done to prevent loading a file that hasn't been fully written). + */ + public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000; + + public static final long THREAD_JOIN_TIMEOUT_MS = 1000; + + private final Clock clock; + + private long lastSuccessfulReload; // Last time we successfully reloaded queues + private boolean lastReloadAttemptFailed = false; + + // Path to XML file containing allocations. + private File allocFile; + + private Listener reloadListener; + + @VisibleForTesting + long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS; + + private Thread reloadThread; + private volatile boolean running = true; + + public AllocationFileLoaderService() { + this(new SystemClock()); + } + + public AllocationFileLoaderService(Clock clock) { + super(AllocationFileLoaderService.class.getName()); + this.clock = clock; + + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + this.allocFile = getAllocationFile(conf); + if (allocFile != null) { + reloadThread = new Thread() { + @Override + public void run() { + while (running) { + long time = clock.getTime(); + long lastModified = allocFile.lastModified(); + if (lastModified > lastSuccessfulReload && + time > lastModified + ALLOC_RELOAD_WAIT_MS) { + try { + reloadAllocations(); + } catch (Exception ex) { + if (!lastReloadAttemptFailed) { + LOG.error("Failed to reload fair scheduler config file - " + + "will use existing allocations.", ex); + } + lastReloadAttemptFailed = true; + } + } else if (lastModified == 0l) { + if (!lastReloadAttemptFailed) { + LOG.warn("Failed to reload fair scheduler config file because" + + " last modified returned 0. File exists: " + + allocFile.exists()); + } + lastReloadAttemptFailed = true; + } + try { + Thread.sleep(reloadIntervalMs); + } catch (InterruptedException ex) { + LOG.info( + "Interrupted while waiting to reload alloc configuration"); + } + } + } + }; + reloadThread.setName("AllocationFileReloader"); + reloadThread.setDaemon(true); + } + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + if (reloadThread != null) { + reloadThread.start(); + } + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + running = false; + if (reloadThread != null) { + reloadThread.interrupt(); + try { + reloadThread.join(THREAD_JOIN_TIMEOUT_MS); + } catch (InterruptedException e) { + LOG.warn("reloadThread fails to join."); + } + } + super.serviceStop(); + } + + /** + * Path to XML file containing allocations. If the + * path is relative, it is searched for in the + * classpath, but loaded like a regular File. + */ + public File getAllocationFile(Configuration conf) { + String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE, + FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE); + File allocFile = new File(allocFilePath); + if (!allocFile.isAbsolute()) { + URL url = Thread.currentThread().getContextClassLoader() + .getResource(allocFilePath); + if (url == null) { + LOG.warn(allocFilePath + " not found on the classpath."); + allocFile = null; + } else if (!url.getProtocol().equalsIgnoreCase("file")) { + throw new RuntimeException("Allocation file " + url + + " found on the classpath is not on the local filesystem."); + } else { + allocFile = new File(url.getPath()); + } + } + return allocFile; + } + + public synchronized void setReloadListener(Listener reloadListener) { + this.reloadListener = reloadListener; + } + + /** + * Updates the allocation list from the allocation config file. This file is + * expected to be in the XML format specified in the design doc. + * + * @throws IOException if the config file cannot be read. + * @throws AllocationConfigurationException if allocations are invalid. + * @throws ParserConfigurationException if XML parser is misconfigured. + * @throws SAXException if config file is malformed. + */ + public synchronized void reloadAllocations() throws IOException, + ParserConfigurationException, SAXException, + AllocationConfigurationException { + if (allocFile == null) { + return; + } + LOG.info("Loading allocation file " + allocFile); + // Create some temporary hashmaps to hold the new allocs, and we only save + // them in our fields if we have parsed the entire allocs file successfully. + Map<String, Resource> minQueueResources = new HashMap<>(); + Map<String, Resource> maxQueueResources = new HashMap<>(); + Map<String, Resource> maxChildQueueResources = new HashMap<>(); + Map<String, Integer> queueMaxApps = new HashMap<>(); + Map<String, Integer> userMaxApps = new HashMap<>(); + Map<String, Float> queueMaxAMShares = new HashMap<>(); + Map<String, ResourceWeights> queueWeights = new HashMap<>(); + Map<String, SchedulingPolicy> queuePolicies = new HashMap<>(); + Map<String, Long> minSharePreemptionTimeouts = new HashMap<>(); + Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>(); + Map<String, Float> fairSharePreemptionThresholds = new HashMap<>(); + Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>(); + Set<String> nonPreemptableQueues = new HashSet<>(); + int userMaxAppsDefault = Integer.MAX_VALUE; + int queueMaxAppsDefault = Integer.MAX_VALUE; + Resource queueMaxResourcesDefault = Resources.unbounded(); + float queueMaxAMShareDefault = 0.5f; + long defaultFairSharePreemptionTimeout = Long.MAX_VALUE; + long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + float defaultFairSharePreemptionThreshold = 0.5f; + SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; + + QueuePlacementPolicy newPlacementPolicy = null; + + // Remember all queue names so we can display them on web UI, etc. + // configuredQueues is segregated based on whether it is a leaf queue + // or a parent queue. This information is used for creating queues + // and also for making queue placement decisions(QueuePlacementRule.java). + Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>(); + + for (FSQueueType queueType : FSQueueType.values()) { + configuredQueues.put(queueType, new HashSet<String>()); + } + + // Read and parse the allocations file. + DocumentBuilderFactory docBuilderFactory = + DocumentBuilderFactory.newInstance(); + docBuilderFactory.setIgnoringComments(true); + DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); + Document doc = builder.parse(allocFile); + Element root = doc.getDocumentElement(); + if (!"allocations".equals(root.getTagName())) + throw new AllocationConfigurationException("Bad fair scheduler config " + + "file: top-level element not <allocations>"); + NodeList elements = root.getChildNodes(); + List<Element> queueElements = new ArrayList<Element>(); + Element placementPolicyElement = null; + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element)node; + if ("queue".equals(element.getTagName()) || + "pool".equals(element.getTagName())) { + queueElements.add(element); + } else if ("user".equals(element.getTagName())) { + String userName = element.getAttribute("name"); + NodeList fields = element.getChildNodes(); + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) + continue; + Element field = (Element) fieldNode; + if ("maxRunningApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + userMaxApps.put(userName, val); + } + } + } else if ("queueMaxResourcesDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + Resource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + queueMaxResourcesDefault = val; + } else if ("userMaxAppsDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + userMaxAppsDefault = val; + } else if ("defaultFairSharePreemptionTimeout" + .equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + defaultFairSharePreemptionTimeout = val; + } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { + if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) { + String text = ((Text)element.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + defaultFairSharePreemptionTimeout = val; + } + } else if ("defaultMinSharePreemptionTimeout" + .equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + defaultMinSharePreemptionTimeout = val; + } else if ("defaultFairSharePreemptionThreshold" + .equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + defaultFairSharePreemptionThreshold = val; + } else if ("queueMaxAppsDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + queueMaxAppsDefault = val; + } else if ("queueMaxAMShareDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + queueMaxAMShareDefault = val; + } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) + || "defaultQueueSchedulingMode".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + if (text.equalsIgnoreCase("FIFO")) { + throw new AllocationConfigurationException("Bad fair scheduler " + + "config file: defaultQueueSchedulingPolicy or " + + "defaultQueueSchedulingMode can't be FIFO."); + } + defaultSchedPolicy = SchedulingPolicy.parse(text); + } else if ("queuePlacementPolicy".equals(element.getTagName())) { + placementPolicyElement = element; + } else { + LOG.warn("Bad element in allocations file: " + element.getTagName()); + } + } + } + + // Load queue elements. A root queue can either be included or omitted. If + // it's included, all other queues must be inside it. + for (Element element : queueElements) { + String parent = "root"; + if (element.getAttribute("name").equalsIgnoreCase("root")) { + if (queueElements.size() > 1) { + throw new AllocationConfigurationException("If configuring root queue," + + " no other queues can be placed alongside it."); + } + parent = null; + } + loadQueue(parent, element, minQueueResources, maxQueueResources, + maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, + queueWeights, queuePolicies, minSharePreemptionTimeouts, + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, + queueAcls, configuredQueues, nonPreemptableQueues); + } + + // Load placement policy and pass it configured queues + Configuration conf = getConfig(); + if (placementPolicyElement != null) { + newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement, + configuredQueues, conf); + } else { + newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, + configuredQueues); + } + + // Set the min/fair share preemption timeout for the root queue + if (!minSharePreemptionTimeouts.containsKey("root")){ + minSharePreemptionTimeouts.put("root", + defaultMinSharePreemptionTimeout); + } + if (!fairSharePreemptionTimeouts.containsKey("root")) { + fairSharePreemptionTimeouts.put("root", + defaultFairSharePreemptionTimeout); + } + + // Set the fair share preemption threshold for the root queue + if (!fairSharePreemptionThresholds.containsKey("root")) { + fairSharePreemptionThresholds.put("root", + defaultFairSharePreemptionThreshold); + } + + AllocationConfiguration info = + new AllocationConfiguration(minQueueResources, maxQueueResources, + maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights, + queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, + queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies, + defaultSchedPolicy, minSharePreemptionTimeouts, + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, + newPlacementPolicy, configuredQueues, nonPreemptableQueues); + lastSuccessfulReload = clock.getTime(); + lastReloadAttemptFailed = false; + + reloadListener.onReload(info); + } + + /** + * Loads a queue from a queue element in the configuration file + */ + private void loadQueue(String parentName, Element element, + Map<String, Resource> minQueueResources, + Map<String, Resource> maxQueueResources, + Map<String, Resource> maxChildQueueResources, + Map<String, Integer> queueMaxApps, + Map<String, Integer> userMaxApps, + Map<String, Float> queueMaxAMShares, + Map<String, ResourceWeights> queueWeights, + Map<String, SchedulingPolicy> queuePolicies, + Map<String, Long> minSharePreemptionTimeouts, + Map<String, Long> fairSharePreemptionTimeouts, + Map<String, Float> fairSharePreemptionThresholds, + Map<String, Map<QueueACL, AccessControlList>> queueAcls, + Map<FSQueueType, Set<String>> configuredQueues, + Set<String> nonPreemptableQueues) + throws AllocationConfigurationException { + String queueName = CharMatcher.WHITESPACE.trimFrom( + element.getAttribute("name")); + + if (queueName.contains(".")) { + throw new AllocationConfigurationException("Bad fair scheduler config " + + "file: queue name (" + queueName + ") shouldn't contain period."); + } + + if (queueName.isEmpty()) { + throw new AllocationConfigurationException("Bad fair scheduler config " + + "file: queue name shouldn't be empty or " + + "consist only of whitespace."); + } + + if (parentName != null) { + queueName = parentName + "." + queueName; + } + + Map<QueueACL, AccessControlList> acls = new HashMap<>(); + NodeList fields = element.getChildNodes(); + boolean isLeaf = true; + + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) + continue; + Element field = (Element) fieldNode; + if ("minResources".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + Resource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + minQueueResources.put(queueName, val); + } else if ("maxResources".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + Resource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + maxQueueResources.put(queueName, val); + } else if ("maxChildResources".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + Resource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + maxChildQueueResources.put(queueName, val); + } else if ("maxRunningApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + queueMaxApps.put(queueName, val); + } else if ("maxAMShare".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + queueMaxAMShares.put(queueName, val); + } else if ("weight".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + double val = Double.parseDouble(text); + queueWeights.put(queueName, new ResourceWeights((float)val)); + } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + minSharePreemptionTimeouts.put(queueName, val); + } else if ("fairSharePreemptionTimeout".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + fairSharePreemptionTimeouts.put(queueName, val); + } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + fairSharePreemptionThresholds.put(queueName, val); + } else if ("schedulingPolicy".equals(field.getTagName()) + || "schedulingMode".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + SchedulingPolicy policy = SchedulingPolicy.parse(text); + queuePolicies.put(queueName, policy); + } else if ("aclSubmitApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData(); + acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); + } else if ("aclAdministerApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData(); + acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); + } else if ("allowPreemptionFrom".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + if (!Boolean.parseBoolean(text)) { + nonPreemptableQueues.add(queueName); + } + } else if ("queue".endsWith(field.getTagName()) || + "pool".equals(field.getTagName())) { + loadQueue(queueName, field, minQueueResources, maxQueueResources, + maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, + queueWeights, queuePolicies, minSharePreemptionTimeouts, + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, + queueAcls, configuredQueues, nonPreemptableQueues); + configuredQueues.get(FSQueueType.PARENT).add(queueName); + isLeaf = false; + } + } + if (isLeaf) { + // if a leaf in the alloc file is marked as type='parent' + // then store it under 'parent' + if ("parent".equals(element.getAttribute("type"))) { + configuredQueues.get(FSQueueType.PARENT).add(queueName); + } else { + configuredQueues.get(FSQueueType.LEAF).add(queueName); + } + } + queueAcls.put(queueName, acls); + if (maxQueueResources.containsKey(queueName) && + minQueueResources.containsKey(queueName) + && !Resources.fitsIn(minQueueResources.get(queueName), + maxQueueResources.get(queueName))) { + LOG.warn( + String.format("Queue %s has max resources %s less than " + + "min resources %s", queueName, maxQueueResources.get(queueName), + minQueueResources.get(queueName))); + } + } + + public interface Listener { + public void onReload(AllocationConfiguration info); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java new file mode 100644 index 0000000..ba8912d --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.impala.yarn.server.resourcemanager.scheduler.fair; +//YARNUTIL: VANILLA + +public enum FSQueueType { + /* + * Represents a leaf queue + */ + LEAF, + + /* + * Represents a parent queue + */ + PARENT +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java new file mode 100644 index 0000000..572ef74 --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.impala.yarn.server.resourcemanager.scheduler.fair; +//YARNUTIL: MODIFIED + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.impala.yarn.server.utils.BuilderUtils; + +public class FairSchedulerConfiguration extends Configuration { + + private static final String CONF_PREFIX = "yarn.scheduler.fair."; + + public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file"; + protected static final String DEFAULT_ALLOCATION_FILE = "fair-scheduler.xml"; + + /** Whether pools can be created that were not specified in the FS configuration file + */ + protected static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX + "allow-undeclared-pools"; + protected static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true; + + /** + * Whether to use the user name as the queue name (instead of "default") if + * the request does not specify a queue. + */ + protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue"; + protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true; + + /** + * Parses a resource config value of a form like "1024", "1024 mb", + * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed. + * + * @throws AllocationConfigurationException + */ + public static Resource parseResourceConfigValue(String val) + throws AllocationConfigurationException { + try { + val = val.toLowerCase(); + int memory = findResource(val, "mb"); + int vcores = findResource(val, "vcores"); + return BuilderUtils.newResource(memory, vcores); + } catch (AllocationConfigurationException ex) { + throw ex; + } catch (Exception ex) { + throw new AllocationConfigurationException( + "Error reading resource config", ex); + } + } + + private static int findResource(String val, String units) + throws AllocationConfigurationException { + Pattern pattern = Pattern.compile("(\\d+)(\\.\\d*)?\\s*" + units); + Matcher matcher = pattern.matcher(val); + if (!matcher.find()) { + throw new AllocationConfigurationException("Missing resource: " + units); + } + return Integer.parseInt(matcher.group(1)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java new file mode 100644 index 0000000..74c690c --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.impala.yarn.server.resourcemanager.scheduler.fair; +//YARNUTIL: VANILLA + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.util.ReflectionUtils; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +@Private +@Unstable +public class QueuePlacementPolicy { + private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses; + static { + Map<String, Class<? extends QueuePlacementRule>> map = + new HashMap<String, Class<? extends QueuePlacementRule>>(); + map.put("user", QueuePlacementRule.User.class); + map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class); + map.put("secondaryGroupExistingQueue", + QueuePlacementRule.SecondaryGroupExistingQueue.class); + map.put("specified", QueuePlacementRule.Specified.class); + map.put("nestedUserQueue", + QueuePlacementRule.NestedUserQueue.class); + map.put("default", QueuePlacementRule.Default.class); + map.put("reject", QueuePlacementRule.Reject.class); + ruleClasses = Collections.unmodifiableMap(map); + } + + private final List<QueuePlacementRule> rules; + private final Map<FSQueueType, Set<String>> configuredQueues; + private final Groups groups; + + public QueuePlacementPolicy(List<QueuePlacementRule> rules, + Map<FSQueueType, Set<String>> configuredQueues, Configuration conf) + throws AllocationConfigurationException { + for (int i = 0; i < rules.size()-1; i++) { + if (rules.get(i).isTerminal()) { + throw new AllocationConfigurationException("Rules after rule " + + i + " in queue placement policy can never be reached"); + } + } + if (!rules.get(rules.size()-1).isTerminal()) { + throw new AllocationConfigurationException( + "Could get past last queue placement rule without assigning"); + } + this.rules = rules; + this.configuredQueues = configuredQueues; + groups = new Groups(conf); + } + + /** + * Builds a QueuePlacementPolicy from an xml element. + */ + public static QueuePlacementPolicy fromXml(Element el, + Map<FSQueueType, Set<String>> configuredQueues, Configuration conf) + throws AllocationConfigurationException { + List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>(); + NodeList elements = el.getChildNodes(); + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + QueuePlacementRule rule = createAndInitializeRule(node); + rules.add(rule); + } + } + return new QueuePlacementPolicy(rules, configuredQueues, conf); + } + + /** + * Create and initialize a rule given a xml node + * @param node + * @return QueuePlacementPolicy + * @throws AllocationConfigurationException + */ + public static QueuePlacementRule createAndInitializeRule(Node node) + throws AllocationConfigurationException { + Element element = (Element) node; + + String ruleName = element.getAttribute("name"); + if ("".equals(ruleName)) { + throw new AllocationConfigurationException("No name provided for a " + + "rule element"); + } + + Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName); + if (clazz == null) { + throw new AllocationConfigurationException("No rule class found for " + + ruleName); + } + QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null); + rule.initializeFromXml(element); + return rule; + } + + /** + * Build a simple queue placement policy from the allow-undeclared-pools and + * user-as-default-queue configuration options. + */ + public static QueuePlacementPolicy fromConfiguration(Configuration conf, + Map<FSQueueType, Set<String>> configuredQueues) { + boolean create = conf.getBoolean( + FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, + FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS); + boolean userAsDefaultQueue = conf.getBoolean( + FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, + FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE); + List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>(); + rules.add(new QueuePlacementRule.Specified().initialize(create, null)); + if (userAsDefaultQueue) { + rules.add(new QueuePlacementRule.User().initialize(create, null)); + } + if (!userAsDefaultQueue || !create) { + rules.add(new QueuePlacementRule.Default().initialize(true, null)); + } + try { + return new QueuePlacementPolicy(rules, configuredQueues, conf); + } catch (AllocationConfigurationException ex) { + throw new RuntimeException("Should never hit exception when loading" + + "placement policy from conf", ex); + } + } + + /** + * Applies this rule to an app with the given requested queue and user/group + * information. + * + * @param requestedQueue + * The queue specified in the ApplicationSubmissionContext + * @param user + * The user submitting the app + * @return + * The name of the queue to assign the app to. Or null if the app should + * be rejected. + * @throws IOException + * If an exception is encountered while getting the user's groups + */ + public String assignAppToQueue(String requestedQueue, String user) + throws IOException { + for (QueuePlacementRule rule : rules) { + String queue = rule.assignAppToQueue(requestedQueue, user, groups, + configuredQueues); + if (queue == null || !queue.isEmpty()) { + return queue; + } + } + throw new IllegalStateException("Should have applied a rule before " + + "reaching here"); + } + + public List<QueuePlacementRule> getRules() { + return rules; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java new file mode 100644 index 0000000..4be9f73 --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.impala.yarn.server.resourcemanager.scheduler.fair; + +//YARNUTIL: VANILLA +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.w3c.dom.Element; +import org.w3c.dom.NamedNodeMap; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import com.google.common.annotations.VisibleForTesting; + +@Private +@Unstable +public abstract class QueuePlacementRule { + protected boolean create; + public static final Log LOG = + LogFactory.getLog(QueuePlacementRule.class.getName()); + + /** + * Initializes the rule with any arguments. + * + * @param args + * Additional attributes of the rule's xml element other than create. + */ + public QueuePlacementRule initialize(boolean create, Map<String, String> args) { + this.create = create; + return this; + } + + /** + * + * @param requestedQueue + * The queue explicitly requested. + * @param user + * The user submitting the app. + * @param groups + * The groups of the user submitting the app. + * @param configuredQueues + * The queues specified in the scheduler configuration. + * @return + * The queue to place the app into. An empty string indicates that we should + * continue to the next rule, and null indicates that the app should be rejected. + */ + public String assignAppToQueue(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) + throws IOException { + String queue = getQueueForApp(requestedQueue, user, groups, + configuredQueues); + if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue) + || configuredQueues.get(FSQueueType.PARENT).contains(queue)) { + return queue; + } else { + return ""; + } + } + + public void initializeFromXml(Element el) + throws AllocationConfigurationException { + boolean create = true; + NamedNodeMap attributes = el.getAttributes(); + Map<String, String> args = new HashMap<String, String>(); + for (int i = 0; i < attributes.getLength(); i++) { + Node node = attributes.item(i); + String key = node.getNodeName(); + String value = node.getNodeValue(); + if (key.equals("create")) { + create = Boolean.parseBoolean(value); + } else { + args.put(key, value); + } + } + initialize(create, args); + } + + /** + * Returns true if this rule never tells the policy to continue. + */ + public abstract boolean isTerminal(); + + /** + * Applies this rule to an app with the given requested queue and user/group + * information. + * + * @param requestedQueue + * The queue specified in the ApplicationSubmissionContext + * @param user + * The user submitting the app. + * @param groups + * The groups of the user submitting the app. + * @return + * The name of the queue to assign the app to, or null to empty string + * continue to the next rule. + */ + protected abstract String getQueueForApp(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) + throws IOException; + + /** + * Places apps in queues by username of the submitter + */ + public static class User extends QueuePlacementRule { + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) { + return "root." + cleanName(user); + } + + @Override + public boolean isTerminal() { + return create; + } + } + + /** + * Places apps in queues by primary group of the submitter + */ + public static class PrimaryGroup extends QueuePlacementRule { + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) + throws IOException { + final List<String> groupList = groups.getGroups(user); + if (groupList.isEmpty()) { + throw new IOException("No groups returned for user " + user); + } + return "root." + cleanName(groupList.get(0)); + } + + @Override + public boolean isTerminal() { + return create; + } + } + + /** + * Places apps in queues by secondary group of the submitter + * + * Match will be made on first secondary group that exist in + * queues + */ + public static class SecondaryGroupExistingQueue extends QueuePlacementRule { + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) + throws IOException { + List<String> groupNames = groups.getGroups(user); + for (int i = 1; i < groupNames.size(); i++) { + String group = cleanName(groupNames.get(i)); + if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group) + || configuredQueues.get(FSQueueType.PARENT).contains( + "root." + group)) { + return "root." + group; + } + } + + return ""; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * Places apps in queues with name of the submitter under the queue + * returned by the nested rule. + */ + public static class NestedUserQueue extends QueuePlacementRule { + @VisibleForTesting + QueuePlacementRule nestedRule; + + /** + * Parse xml and instantiate the nested rule + */ + @Override + public void initializeFromXml(Element el) + throws AllocationConfigurationException { + NodeList elements = el.getChildNodes(); + + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element) node; + if ("rule".equals(element.getTagName())) { + QueuePlacementRule rule = QueuePlacementPolicy + .createAndInitializeRule(node); + if (rule == null) { + throw new AllocationConfigurationException( + "Unable to create nested rule in nestedUserQueue rule"); + } + this.nestedRule = rule; + break; + } else { + continue; + } + } + } + + if (this.nestedRule == null) { + throw new AllocationConfigurationException( + "No nested rule specified in <nestedUserQueue> rule"); + } + super.initializeFromXml(el); + } + + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) + throws IOException { + // Apply the nested rule + String queueName = nestedRule.assignAppToQueue(requestedQueue, user, + groups, configuredQueues); + + if (queueName != null && queueName.length() != 0) { + if (!queueName.startsWith("root.")) { + queueName = "root." + queueName; + } + + // Verify if the queue returned by the nested rule is an configured leaf queue, + // if yes then skip to next rule in the queue placement policy + if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) { + return ""; + } + return queueName + "." + cleanName(user); + } + return queueName; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * Places apps in queues by requested queue of the submitter + */ + public static class Specified extends QueuePlacementRule { + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) { + if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) { + return ""; + } else { + if (!requestedQueue.startsWith("root.")) { + requestedQueue = "root." + requestedQueue; + } + return requestedQueue; + } + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * Places apps in the specified default queue. If no default queue is + * specified the app is placed in root.default queue. + */ + public static class Default extends QueuePlacementRule { + @VisibleForTesting + String defaultQueueName; + + @Override + public QueuePlacementRule initialize(boolean create, + Map<String, String> args) { + if (defaultQueueName == null) { + defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; + } + return super.initialize(create, args); + } + + @Override + public void initializeFromXml(Element el) + throws AllocationConfigurationException { + defaultQueueName = el.getAttribute("queue"); + if (defaultQueueName != null && !defaultQueueName.isEmpty()) { + if (!defaultQueueName.startsWith("root.")) { + defaultQueueName = "root." + defaultQueueName; + } + } else { + defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; + } + super.initializeFromXml(el); + } + + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) { + return defaultQueueName; + } + + @Override + public boolean isTerminal() { + return true; + } + } + + /** + * Rejects all apps + */ + public static class Reject extends QueuePlacementRule { + @Override + public String assignAppToQueue(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) { + return null; + } + + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map<FSQueueType, Set<String>> configuredQueues) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTerminal() { + return true; + } + } + + /** + * Replace the periods in the username or groupname with "_dot_" and + * remove trailing and leading whitespace. + */ + protected String cleanName(String name) { + name = name.trim(); + if (name.contains(".")) { + String converted = name.replaceAll("\\.", "_dot_"); + LOG.warn("Name " + name + " is converted to " + converted + + " when it is used as a queue name."); + return converted; + } else { + return name; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java new file mode 100644 index 0000000..0d19614 --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.impala.yarn.server.resourcemanager.scheduler.fair; +//YARNUTIL: DUMMY IMPL + +public class SchedulingPolicy { + public static final SchedulingPolicy DEFAULT_POLICY = null; + + public static SchedulingPolicy parse(String s) { + return DEFAULT_POLICY; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java ---------------------------------------------------------------------- diff --git a/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java new file mode 100644 index 0000000..e62bc25 --- /dev/null +++ b/common/yarn-extras/src/main/java/org/apache/impala/yarn/server/utils/BuilderUtils.java @@ -0,0 +1,41 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.impala.yarn.server.utils; +//YARNUTIL: MODIFIED + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + +/** + * Builder utilities to construct various objects. + * + */ +public class BuilderUtils { + + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + public static Resource newResource(int memory, int vCores) { + Resource resource = recordFactory.newRecordInstance(Resource.class); + resource.setMemory(memory); + resource.setVirtualCores(vCores); + return resource; + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/fe/CMakeLists.txt b/fe/CMakeLists.txt index eee1601..d5c7c1e 100644 --- a/fe/CMakeLists.txt +++ b/fe/CMakeLists.txt @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. -add_custom_target(fe ALL DEPENDS thrift-deps fb-deps function-registry ext-data-source +add_custom_target(fe ALL DEPENDS + thrift-deps fb-deps yarn-extras function-registry ext-data-source COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh install -DskipTests ) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/pom.xml ---------------------------------------------------------------------- diff --git a/fe/pom.xml b/fe/pom.xml index 0f7ae8c..a0330d7 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -47,6 +47,7 @@ under the License. <impala.extdatasrc.api.version>1.0-SNAPSHOT</impala.extdatasrc.api.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <kudu.version>${env.KUDU_JAVA_VERSION}</kudu.version> + <yarn-extras.version>${project.version}</yarn-extras.version> <eclipse.output.directory>eclipse-classes</eclipse.output.directory> </properties> @@ -98,24 +99,18 @@ under the License. </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> <groupId>org.apache.sentry</groupId> <artifactId>sentry-core-common</artifactId> <version>${sentry.version}</version> </dependency> <dependency> + <groupId>org.apache.impala</groupId> + <artifactId>yarn-extras</artifactId> + <version>${yarn-extras.version}</version> + </dependency> + + <dependency> <groupId>org.apache.sentry</groupId> <artifactId>sentry-core-model-db</artifactId> <version>${sentry.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/src/main/java/org/apache/impala/util/RequestPoolService.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java index 9976623..5cea0e1 100644 --- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java +++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java @@ -28,9 +28,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; @@ -49,6 +46,9 @@ import org.apache.impala.thrift.TResolveRequestPoolParams; import org.apache.impala.thrift.TResolveRequestPoolResult; import org.apache.impala.thrift.TStatus; import org.apache.impala.util.FileWatchService.FileChangeListener; +import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; +import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8ad6d033/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java index e849a81..e84a9bc 100644 --- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java +++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java @@ -22,16 +22,25 @@ import java.io.IOException; import java.lang.reflect.Field; import java.net.URISyntaxException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService; + +import org.apache.impala.authorization.User; import org.apache.impala.common.ByteUnits; import org.apache.impala.common.InternalException; +import org.apache.impala.common.RuntimeEnv; import org.apache.impala.thrift.TErrorCode; import org.apache.impala.thrift.TPoolConfig; import org.apache.impala.thrift.TResolveRequestPoolParams; @@ -109,6 +118,18 @@ public class TestRequestPoolService { poolService_.start(); } + @BeforeClass + public static void setUpClass() throws Exception { + RuntimeEnv.INSTANCE.setTestEnv(true); + User.setRulesForTesting( + new Configuration().get(HADOOP_SECURITY_AUTH_TO_LOCAL, "DEFAULT")); + } + + @AfterClass + public static void cleanUpClass() { + RuntimeEnv.INSTANCE.reset(); + } + @After public void cleanUp() throws Exception { if (poolService_ != null) poolService_.stop();
