sohami commented on a change in pull request #1652: DRILL-7046: Support for 
loading and parsing new RM config file
URL: https://github.com/apache/drill/pull/1652#discussion_r260517754
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/ResourcePoolTreeImpl.java
 ##########
 @@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.exception.QueueSelectionException;
+import org.apache.drill.exec.resourcemgr.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.selectionpolicy.QueueSelectionPolicy;
+import 
org.apache.drill.exec.resourcemgr.selectionpolicy.QueueSelectionPolicyFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.drill.exec.resourcemgr.RMCommonDefaults.ROOT_POOL_DEFAULT_MEMORY_PERCENT;
+import static 
org.apache.drill.exec.resourcemgr.RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY;
+
+/**
+ * Parses and initializes configuration for ResourceManagement in Drill. It 
takes care of creating all the ResourcePools
+ * recursively maintaining the n-ary tree hierarchy defined in the 
configuration.
+ */
+public class ResourcePoolTreeImpl implements ResourcePoolTree {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ResourcePoolTreeImpl.class);
+
+  private ResourcePool rootPool;
+
+  private Config rmConfig;
+
+  private NodeResources totalNodeResources;
+
+  private double resourceShare;
+
+  private Map<String, QueryQueueConfig> leafQueues = Maps.newHashMap();
+
+  private QueueSelectionPolicy selectionPolicy;
+
+  public static final String ROOT_POOL_MEMORY_SHARE_KEY = "memory";
+
+  public static final String ROOT_POOL_QUEUE_SELECTION_POLICY_KEY = 
"queue_selection_policy";
+
+  public static final String ROOT_POOL_CONFIG_KEY = "drill.exec.rm";
+
+  public ResourcePoolTreeImpl(Config rmConfig, long totalNodeMemory,
+                              int totalNodePhysicalCpu, int vFactor) throws 
RMConfigException {
+    this(rmConfig, new NodeResources(totalNodeMemory, totalNodePhysicalCpu, 
vFactor));
+  }
+
+  private ResourcePoolTreeImpl(Config rmConfig, NodeResources 
totalNodeResources) throws RMConfigException {
+    try {
+      this.rmConfig = rmConfig;
+      this.totalNodeResources = totalNodeResources;
+      this.resourceShare = this.rmConfig.hasPath(ROOT_POOL_MEMORY_SHARE_KEY) ?
+        this.rmConfig.getDouble(ROOT_POOL_MEMORY_SHARE_KEY) : 
ROOT_POOL_DEFAULT_MEMORY_PERCENT;
+      this.selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy(
+        this.rmConfig.hasPath(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY) ?
+          this.rmConfig.getString(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY) : 
ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY);
+      rootPool = new 
ResourcePoolImpl(this.rmConfig.getConfig(ROOT_POOL_CONFIG_KEY), resourceShare, 
1.0,
+        totalNodeResources, null, leafQueues);
+      logger.debug("Dumping RM configuration {}", toString());
+    } catch (RMConfigException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new RMConfigException(String.format("Failure while parsing root 
pool configuration. " +
+        "[Details: Config: %s]", rmConfig), ex);
+    }
+  }
+
+  /**
+   * @return - Root {@link ResourcePool}
+   */
+  @Override
+  public ResourcePool getRootPool() {
+    return rootPool;
+  }
+
+  /**
+   * @return - Map containing all the configured leaf queues
+   */
+  @Override
+  public Map<String, QueryQueueConfig> getAllLeafQueues() {
+    return leafQueues;
+  }
+
+  @Override
+  public double getResourceShare() {
+    return resourceShare;
+  }
+
+  /**
+   * Creates {@link QueueAssignmentResult} which contains list of all selected 
leaf ResourcePools and all the rejected
+   * ResourcePools for the provided query. Performs DFS of the 
ResourcePoolTree to traverse and find
+   * selected/rejected ResourcePools.
+   * @param queryContext - {@link QueryContext} which contains metadata 
required for the given query
+   * @return - {@link QueueAssignmentResult} populated with selected/rejected 
ResourcePools
+   */
+  @Override
+  public QueueAssignmentResult selectAllQueues(QueryContext queryContext) {
+    QueueAssignmentResult assignmentResult = new QueueAssignmentResult();
+    rootPool.visitAndSelectPool(assignmentResult, queryContext);
+    return assignmentResult;
+  }
+
+  /**
+   * Selects a leaf queue out of all the possibles leaf queues returned by
+   * {@link ResourcePoolTree#selectAllQueues(QueryContext)} for a given query 
based on the configured
+   * {@link QueueSelectionPolicy}. If none of the queue qualifies then it 
throws {@link QueueSelectionException}
+   * @param queryContext - {@link QueryContext} which contains metadata for 
given query
+   * @param queryMaxNodeResource - Max resources on a node required for given 
query
+   * @return {@link QueryQueueConfig} for the selected leaf queue
+   * @throws QueueSelectionException - If no leaf queue is selected
+   */
+  @Override
+  public QueryQueueConfig selectOneQueue(QueryContext queryContext, 
NodeResources queryMaxNodeResource)
+    throws QueueSelectionException {
+    final QueueAssignmentResult assignmentResult = 
selectAllQueues(queryContext);
+    final List<ResourcePool> selectedPools = 
assignmentResult.getSelectedLeafPools();
+    if (selectedPools.size() == 0) {
+      throw new QueueSelectionException(String.format("No resource pools to 
choose from for the query: %s",
+        queryContext.getQueryId()));
+    } else if (selectedPools.size() == 1) {
+      return selectedPools.get(0).getQueuryQueue();
+    }
+
+    return selectionPolicy.selectQueue(selectedPools, queryContext, 
queryMaxNodeResource).getQueuryQueue();
 
 Review comment:
   fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to