DRILL-855: Improve work assignment parallelization

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d929faac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d929faac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d929faac

Branch: refs/heads/master
Commit: d929faace270302c5e00e272775be98d5a8f83e1
Parents: 292765e
Author: Aditya Kishore <[email protected]>
Authored: Wed May 28 04:56:46 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Mon Jun 2 09:14:42 2014 -0700

----------------------------------------------------------------------
 distribution/src/resources/drill-override.conf  |  1 +
 .../org/apache/drill/exec/ExecConstants.java    |  1 +
 .../drill/exec/physical/EndpointAffinity.java   | 20 ++++--
 .../drill/exec/planner/SimpleExecPlanner.java   |  8 +--
 .../planner/fragment/SimpleParallelizer.java    | 47 ++++++++++----
 .../drill/exec/planner/fragment/Wrapper.java    | 68 +++++++++++---------
 .../apache/drill/exec/work/foreman/Foreman.java | 29 ++-------
 .../src/main/resources/drill-module.conf        | 16 +++++
 .../drill/exec/pop/TestFragmentChecker.java     |  4 +-
 9 files changed, 116 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf 
b/distribution/src/resources/drill-override.conf
index a9316a9..da3d094 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -87,6 +87,7 @@ drill.exec: {
   work: {
     max.width.per.endpoint: 5,
     global.max.width: 100,
+    affinity.factor: 1.2,
     executor.threads: 4
   },
   trace: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index d9e0833..e66e93c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -44,6 +44,7 @@ public interface ExecConstants {
   public static final String GLOBAL_MAX_WIDTH = 
"drill.exec.work.global.max.width";
   public static final String MAX_WIDTH_PER_ENDPOINT = 
"drill.exec.work.max.width.per.endpoint";
   public static final String EXECUTOR_THREADS = 
"drill.exec.work.executor.threads";
+  public static final String AFFINITY_FACTOR = 
"drill.exec.work.affinity.factor";
   public static final String CLIENT_RPC_THREADS = 
"drill.exec.rpc.user.client.threads";
   public static final String BIT_SERVER_RPC_THREADS = 
"drill.exec.rpc.bit.server.threads";
   public static final String USER_SERVER_RPC_THREADS = 
"drill.exec.rpc.user.server.threads";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
index f3059ae..df31f74 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
@@ -19,18 +19,20 @@ package org.apache.drill.exec.physical;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
+import com.google.protobuf.TextFormat;
+
 
 public class EndpointAffinity implements Comparable<EndpointAffinity>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class);
-  
+
   private DrillbitEndpoint endpoint;
   private float affinity = 0.0f;
-  
+
   public EndpointAffinity(DrillbitEndpoint endpoint) {
     super();
     this.endpoint = endpoint;
   }
-  
+
   public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
     super();
     this.endpoint = endpoint;
@@ -46,15 +48,19 @@ public class EndpointAffinity implements 
Comparable<EndpointAffinity>{
   public float getAffinity() {
     return affinity;
   }
-  
+
   @Override
   public int compareTo(EndpointAffinity o) {
     return Float.compare(affinity, o.affinity);
   }
-  
+
   public void addAffinity(float f){
     affinity += f;
   }
-  
-  
+
+  @Override
+  public String toString() {
+    return "EndpointAffinity [endpoint=" + 
TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]";
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
index 4da6500..132fde7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
@@ -51,9 +51,9 @@ public class SimpleExecPlanner implements ExecPlanner{
 
     int maxWidthPerEndpoint = 
context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT);
 
-    return parallelizer.getFragments(new OptionList(), 
context.getCurrentEndpoint(), context.getQueryId(), 
context.getActiveEndpoints(),
-            context.getPlanReader(), fragmentRoot, planningSet, maxWidth, 
maxWidthPerEndpoint);
-
-
+    
parallelizer.setGlobalMaxWidth(maxWidth).setMaxWidthPerEndpoint(maxWidthPerEndpoint);
+    return parallelizer.getFragments(new OptionList(), 
context.getCurrentEndpoint(),
+        context.getQueryId(), context.getActiveEndpoints(), 
context.getPlanReader(), fragmentRoot, planningSet);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 313a81d..d226b08 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -51,6 +51,36 @@ public class SimpleParallelizer {
   private final Materializer materializer = new Materializer();
 
   /**
+   * The maximum level or parallelization any stage of the query can do. Note 
that while this
+   * might be the number of active Drillbits, realistically, this could be 
well beyond that
+   * number of we want to do things like speed results return.
+   */
+  private int globalMaxWidth;
+  public SimpleParallelizer setGlobalMaxWidth(int globalMaxWidth) {
+    this.globalMaxWidth = globalMaxWidth;
+    return this;
+  }
+
+  /**
+   * Limits the maximum level of parallelization to this factor time the 
number of Drillbits
+   */
+  private int maxWidthPerEndpoint;
+  public SimpleParallelizer setMaxWidthPerEndpoint(int maxWidthPerEndpoint) {
+    this.maxWidthPerEndpoint = maxWidthPerEndpoint;
+    return this;
+  }
+
+
+  /**
+   * Factor by which a node with endpoint affinity will be favored while 
creating assignment
+   */
+  private double affinityFactor = 1.2f;
+  public SimpleParallelizer setAffinityFactor(double affinityFactor) {
+    this.affinityFactor = affinityFactor;
+    return this;
+  }
+
+  /**
    * Generate a set of assigned fragments based on the provided planningSet. 
Do not allow parallelization stages to go
    * beyond the global max width.
    *
@@ -60,16 +90,12 @@ public class SimpleParallelizer {
    * @param reader          Tool used to read JSON plans
    * @param rootNode        The root node of the PhysicalPlan that we will 
parallelizing.
    * @param planningSet     The set of queries with collected statistics that 
we'll work with.
-   * @param globalMaxWidth  The maximum level or parallelization any stage of 
the query can do. Note that while this
-   *                        might be the number of active Drillbits, 
realistically, this could be well beyond that
-   *                        number of we want to do things like speed results 
return.
-   * @param maxWidthPerEndpoint Limits the maximum level of parallelization to 
this factor time the number of Drillbits
    * @return The list of generated PlanFragment protobuf objects to be 
assigned out to the individual nodes.
    * @throws ExecutionSetupException
    */
-  public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint 
foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, 
PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
-                                    int globalMaxWidth, int 
maxWidthPerEndpoint) throws ExecutionSetupException {
-    assignEndpoints(activeEndpoints, planningSet, globalMaxWidth, 
maxWidthPerEndpoint);
+  public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint 
foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints,
+      PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet) 
throws ExecutionSetupException {
+    assignEndpoints(activeEndpoints, planningSet);
     return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, 
planningSet);
   }
 
@@ -152,11 +178,9 @@ public class SimpleParallelizer {
     }
 
     return new QueryWorkUnit(rootOperator, rootFragment, fragments);
-
   }
 
-  private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, 
PlanningSet planningSet,
-                               int globalMaxWidth, int maxWidthPerEndpoint) 
throws PhysicalOperatorSetupException {
+  private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, 
PlanningSet planningSet) throws PhysicalOperatorSetupException {
     // First we determine the amount of parallelization for a fragment. This 
will be between 1 and maxWidth based on
     // cost. (Later could also be based on cluster operation.) then we decide 
endpoints based on affinity (later this
     // could be based on endpoint load)
@@ -181,7 +205,8 @@ public class SimpleParallelizer {
 //      logger.debug("Setting width {} on fragment {}", width, wrapper);
       wrapper.setWidth(width);
       // figure out endpoint assignments. also informs the exchanges about 
their respective endpoints.
-      wrapper.assignEndpoints(allNodes);
+      wrapper.assignEndpoints(allNodes, affinityFactor);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 8602bf0..38cba09 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -19,20 +19,24 @@ package org.apache.drill.exec.planner.fragment;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -47,7 +51,7 @@ public class Wrapper {
   private int width = -1;
   private final Stats stats;
   private boolean endpointsAssigned;
-  private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = 
Maps.newHashMap();
+  private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap = 
Maps.newHashMap();
   private long initialAllocation = 0;
   private long maxAllocation = 0;
 
@@ -71,14 +75,14 @@ public class Wrapper {
       addEndpointAffinity(ea.getEndpoint(), ea.getAffinity());
     }
   }
-  
+
   public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
     Preconditions.checkState(!endpointsAssigned);
     Preconditions.checkNotNull(endpoint);
-    EndpointAffinity ea = endpointAffinity.get(endpoint);
+    EndpointAffinity ea = endpointAffinityMap.get(endpoint);
     if (ea == null) {
       ea = new EndpointAffinity(endpoint);
-      endpointAffinity.put(endpoint, ea);
+      endpointAffinityMap.put(endpoint, ea);
     }
 
     ea.addAffinity(affinity);
@@ -116,7 +120,6 @@ public class Wrapper {
 
   private class AssignEndpointsToScanAndStore extends 
AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, 
PhysicalOperatorSetupException>{
 
-    
     @Override
     public Void visitExchange(Exchange exchange, List<DrillbitEndpoint> value) 
throws PhysicalOperatorSetupException {
       if(exchange == node.getSendingExchange()){
@@ -148,40 +151,42 @@ public class Wrapper {
     public Void visitOp(PhysicalOperator op, List<DrillbitEndpoint> value) 
throws PhysicalOperatorSetupException {
       return visitChildren(op, value);
     }
-    
+
   }
-  
-  public void assignEndpoints(Collection<DrillbitEndpoint> allPossible) throws 
PhysicalOperatorSetupException {
-    Preconditions.checkState(!endpointsAssigned);
 
+  public void assignEndpoints(Collection<DrillbitEndpoint> allEndpoints, 
double affinityFactor) throws PhysicalOperatorSetupException {
+    Preconditions.checkState(!endpointsAssigned);
     endpointsAssigned = true;
 
-    List<EndpointAffinity> values = Lists.newArrayList();
-    values.addAll(endpointAffinity.values());
-
-    if (values.size() == 0) {
-      List<DrillbitEndpoint> all = Lists.newArrayList(allPossible);
-      final int div = allPossible.size();
-      int start = ThreadLocalRandom.current().nextInt(div);
-      // round robin with random start.
-      for (int i = start; i < start + width; i++) {
-        Preconditions.checkNotNull(all.get(i % div));
-        endpoints.add(all.get(i % div));
-      }
-    } else {
+    if (endpointAffinityMap.size() > 0) {
+      List<EndpointAffinity> affinedEPs = 
Lists.newArrayList(endpointAffinityMap.values());
       // get nodes with highest affinity.
-      Collections.sort(values);
-      values = Lists.reverse(values);
-      for (int i = 0; i < width; i++) {
-        Preconditions.checkNotNull(values.get(i%values.size()).getEndpoint());
-        endpoints.add(values.get(i%values.size()).getEndpoint());
+      Collections.sort(affinedEPs);
+      Iterator<EndpointAffinity> affinedEPItr = 
Iterators.cycle(Lists.reverse(affinedEPs));
+      /** Maximum number of slots which should go to endpoints with affinity */
+      int affinedSlots = Math.min((Math.max(1, (int) 
(affinityFactor*width/allEndpoints.size())) * affinedEPs.size()), width);
+      while(endpoints.size() < affinedSlots) {
+        EndpointAffinity ea = affinedEPItr.next();
+        DrillbitEndpoint endpoint = ea.getEndpoint();
+        endpoints.add(endpoint);
+      }
+    }
+    // add other endpoints if required
+    if (endpoints.size() < width) {
+      List<DrillbitEndpoint> all = Lists.newArrayList(allEndpoints);
+      all.removeAll(endpointAffinityMap.keySet());
+      // round robin with random start.
+      Collections.shuffle(all, ThreadLocalRandom.current());
+      Iterator<DrillbitEndpoint> otherEPItr = Iterators.cycle(all.size() > 0 ? 
all : endpointAffinityMap.keySet());
+      while (endpoints.size() < width) {
+        endpoints.add(otherEPItr.next());
       }
     }
 
     // Set scan and store endpoints.
     AssignEndpointsToScanAndStore visitor = new 
AssignEndpointsToScanAndStore();
     node.getRoot().accept(visitor, endpoints);
-    
+
     // Set the endpoints for this (one at most) sending exchange.
     if (node.getSendingExchange() != null) {
       node.getSendingExchange().setupSenders(majorFragmentId, endpoints);
@@ -202,4 +207,5 @@ public class Wrapper {
     Preconditions.checkState(endpointsAssigned);
     return this.endpoints.get(minorFragmentId);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index eb1d738..b8edb84 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -121,7 +121,6 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     cleanupAndSendResult(result);
   }
 
-
   public void cancel() {
     if(isFinished()){
       return;
@@ -148,8 +147,6 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     }
   }
 
-
-
   /**
    * Called by execution pool to do foreman setup. Actual query execution is a 
separate phase (and can be scheduled).
    */
@@ -160,10 +157,8 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     // convert a run query request into action
     try{
       switch (queryRequest.getType()) {
-
       case LOGICAL:
         parseAndRunLogicalPlan(queryRequest.getPlan());
-
         break;
       case PHYSICAL:
         parseAndRunPhysicalPlan(queryRequest.getPlan());
@@ -185,7 +180,6 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     }
   }
 
-
   private void parseAndRunLogicalPlan(String json) {
 
     try {
@@ -249,8 +243,6 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
 
   }
 
-
-
   private void parseAndRunPhysicalPlan(String json) {
     try {
       PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
@@ -260,7 +252,6 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     }
   }
 
-
   private void runPhysicalPlan(PhysicalPlan plan) {
 
     if(plan.getProperties().resultMode != ResultMode.EXEC){
@@ -278,12 +269,14 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     }
 
     PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
-    SimpleParallelizer parallelizer = new SimpleParallelizer();
+    SimpleParallelizer parallelizer = new SimpleParallelizer()
+      
.setGlobalMaxWidth(context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH))
+      
.setMaxWidthPerEndpoint(context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT))
+      
.setAffinityFactor(context.getConfig().getDouble(ExecConstants.AFFINITY_FACTOR));
 
     try {
-      QueryWorkUnit work = 
parallelizer.getFragments(context.getOptions().getSessionOptionList(), 
context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
-              context.getPlanReader(), rootFragment, planningSet, 
context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH),
-              
context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT));
+      QueryWorkUnit work = 
parallelizer.getFragments(context.getOptions().getSessionOptionList(), 
context.getCurrentEndpoint(),
+          queryId, context.getActiveEndpoints(), context.getPlanReader(), 
rootFragment, planningSet);
 
       
this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(),
 fragmentManager);
       List<PlanFragment> leafFragments = Lists.newArrayList();
@@ -292,17 +285,13 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
       // store fragments in distributed grid.
       logger.debug("Storing fragments");
       for (PlanFragment f : work.getFragments()) {
-
         // store all fragments in grid since they are part of handshake.
-
         context.getCache().storeFragment(f);
         if (f.getLeafFragment()) {
           leafFragments.add(f);
         } else {
           intermediateFragments.add(f);
         }
-
-
       }
 
       logger.debug("Fragments stored.");
@@ -311,7 +300,6 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
       fragmentManager.runFragments(bee, work.getRootFragment(), 
work.getRootOperator(), initiatingClient, leafFragments, intermediateFragments);
       logger.debug("Fragments running.");
 
-
     } catch (ExecutionSetupException | RpcException e) {
       fail("Failure while setting up query.", e);
     }
@@ -352,7 +340,6 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
     return this.state.getState();
   }
 
-
   class ForemanManagerListener{
     void fail(String message, Throwable t) {
       ForemanManagerListener.this.fail(message, t);
@@ -364,13 +351,9 @@ public class Foreman implements Runnable, Closeable, 
Comparable<Object>{
 
   }
 
-
-
   @Override
   public int compareTo(Object o) {
     return o.hashCode() - o.hashCode();
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 26205bd..f8396bb 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 //  This file tells Drill to consider this module when class path scanning.  
 //  This file can also include any supplementary configuration information.  
 //  This file is in HOCON format, see 
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
@@ -76,6 +91,7 @@ drill.exec: {
   work: {
     max.width.per.endpoint: 5,
     global.max.width: 100,
+    affinity.factor: 1.2,
     executor.threads: 4
   },
   trace: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 1b38dce..ea19351 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -59,8 +59,8 @@ public class TestFragmentChecker extends PopUnitTestBase{
       endpoints.add(b1);
     }
 
-
-    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, 
QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
+    par.setGlobalMaxWidth(10).setMaxWidthPerEndpoint(5);
+    QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, 
QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet);
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] 
=========", qwu.getRootFragment().getHandle().getMajorFragmentId(), 
qwu.getRootFragment().getHandle().getMinorFragmentId()));
 
     System.out.print(qwu.getRootFragment().getFragmentJson());

Reply via email to