[
https://issues.apache.org/jira/browse/DRILL-4446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177038#comment-15177038
]
ASF GitHub Bot commented on DRILL-4446:
---------------------------------------
Github user laurentgo commented on a diff in the pull request:
https://github.com/apache/drill/pull/403#discussion_r54829311
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
---
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Implementation of {@link FragmentParallelizer} where fragment requires
running on a given set of endpoints. Width
+ * per node is depended on the affinity to the endpoint and total width
(calculated using costs)
+ */
+public class HardAffinityFragmentParallelizer implements
FragmentParallelizer {
+ public static final HardAffinityFragmentParallelizer INSTANCE = new
HardAffinityFragmentParallelizer();
+
+ private HardAffinityFragmentParallelizer() { /* singleton */}
+
+ @Override
+ public void parallelizeFragment(final Wrapper fragmentWrapper, final
ParallelizationParameters parameters,
+ final Collection<DrillbitEndpoint> activeEndpoints) throws
PhysicalOperatorSetupException {
+
+ final Stats stats = fragmentWrapper.getStats();
+ final ParallelizationInfo pInfo = stats.getParallelizationInfo();
+
+ // Go through the affinity map and extract the endpoints that have
mandatory assignment requirement
+ final Map<DrillbitEndpoint, EndpointAffinity> endpointPool =
Maps.newHashMap();
+ for(Entry<DrillbitEndpoint, EndpointAffinity> entry :
pInfo.getEndpointAffinityMap().entrySet()) {
+ if (entry.getValue().isAssignmentRequired()) {
+ endpointPool.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // Step 1: Find the width taking into various parameters
+ // 1.1. Find the parallelization based on cost. Use max cost of all
operators in this fragment; this is consistent
+ // with the calculation that ExcessiveExchangeRemover uses.
+ int width = (int) Math.ceil(stats.getMaxCost() /
parameters.getParallelizationThreshold());
+
+ // 1.2. Make sure the width is at least the number of endpoints that
require an assignment
+ width = Math.max(endpointPool.size(), width);
+
+ // 1.3. Cap the parallelization width by fragment level width limit
and system level per query width limit
+ width = Math.max(1, Math.min(width, pInfo.getMaxWidth()));
+ checkAndThrow(endpointPool.size() <= width,
+ "Number of mandatory endpoints that require an assignment is more
than the allowed fragment max width.");
+
+ width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth()));
+ checkAndThrow(endpointPool.size() <= width,
+ "Number of mandatory endpoints that require an assignment is more
than the allowed global query width.");
+
+ // Step 2: Select the endpoints
+ final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
+
+ // 2.1 First add each endpoint from the pool once so that the
mandatory assignment requirement is fulfilled.
+ int totalAssigned;
+ for(Entry<DrillbitEndpoint, EndpointAffinity> entry :
endpointPool.entrySet()) {
+ endpoints.put(entry.getKey(), 1);
+ }
+ totalAssigned = endpoints.size();
--- End diff --
maybe you can declare totalAssigned here?
> Improve current fragment parallelization module
> -----------------------------------------------
>
> Key: DRILL-4446
> URL: https://issues.apache.org/jira/browse/DRILL-4446
> Project: Apache Drill
> Issue Type: New Feature
> Affects Versions: 1.5.0
> Reporter: Venki Korukanti
> Assignee: Venki Korukanti
> Fix For: 1.6.0
>
>
> Current fragment parallelizer {{SimpleParallelizer.java}} can’t handle
> correctly the case where an operator has mandatory scheduling requirement for
> a set of DrillbitEndpoints and affinity for each DrillbitEndpoint (i.e how
> much portion of the total tasks to be scheduled on each DrillbitEndpoint). It
> assumes that scheduling requirements are soft (except one case where Mux and
> DeMux case where mandatory parallelization requirement of 1 unit).
> An example is:
> Cluster has 3 nodes running Drillbits and storage service on each. Data for a
> table is only present at storage services in two nodes. So a GroupScan needs
> to be scheduled on these two nodes in order to read the data. Storage service
> doesn't support (or costly) reading data from remote node.
> Inserting the mandatory scheduling requirements within existing
> SimpleParallelizer is not sufficient as you may end up with a plan that has a
> fragment with two GroupScans each having its own hard parallelization
> requirements.
> Proposal is:
> Add a property to each operator which tells what parallelization
> implementation to use. Most operators don't have any particular strategy
> (such as Project or Filter), they depend on incoming operator. Current
> existing operators which have requirements (all existing GroupScans) default
> to current parallelizer {{SimpleParallelizer}}. {{Screen}} defaults to new
> mandatory assignment parallelizer. It is possible that PhysicalPlan generated
> can have a fragment with operators having different parallelization
> strategies. In that case an exchange is inserted in between operators where a
> change in parallelization strategy is required.
> Will send a detailed design doc.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)