DRILL-235: Add configuration parameters to control number of threads and width of queries
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/627c84e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/627c84e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/627c84e1 Branch: refs/heads/master Commit: 627c84e14bb87384a4871790d3e38ede02480768 Parents: 82a7b8f Author: Steven Phillips <[email protected]> Authored: Thu Sep 5 20:00:02 2013 -0700 Committer: Steven Phillips <[email protected]> Committed: Fri Sep 20 12:10:07 2013 -0700 ---------------------------------------------------------------------- distribution/src/resources/drill-override.conf | 43 +++++++++---- .../org/apache/drill/exec/ExecConstants.java | 14 +++-- .../apache/drill/exec/client/DrillClient.java | 2 +- .../drill/exec/planner/SimpleExecPlanner.java | 6 +- .../planner/fragment/SimpleParallelizer.java | 9 ++- .../drill/exec/planner/fragment/Wrapper.java | 1 - .../drill/exec/server/BootStrapContext.java | 2 +- .../drill/exec/service/ServiceEngine.java | 3 +- .../org/apache/drill/exec/work/WorkManager.java | 5 +- .../apache/drill/exec/work/foreman/Foreman.java | 5 +- .../src/main/resources/drill-module.conf | 34 ++++++++--- .../drill/exec/pop/TestFragmentChecker.java | 2 +- .../src/test/resources/drill-module.conf | 63 +++++++++++++------- 13 files changed, 133 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/distribution/src/resources/drill-override.conf ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf index fb1b627..c2ed9df 100644 --- a/distribution/src/resources/drill-override.conf +++ b/distribution/src/resources/drill-override.conf @@ -19,8 +19,26 @@ drill.exec: { cluster-id: "drillbits1" rpc: { - user.port : 31010, - bit.port : 32011 + user: { + server: { + port: 31010 + threads: 1 + } + client: { + threads: 1 + } + }, + bit: { + server: { + port : 31011, + retry:{ + count: 7200, + delay: 500 + }, + threads: 1 + } + }, + use.ip : false }, operator: { packages += "org.apache.drill.exec.physical.config" @@ -32,20 +50,25 @@ drill.exec: { packages += "org.apache.drill.exec.store" } metrics : { - context: "drillbit" + context: "drillbit" }, zk: { connect: "localhost:2181", root: "/drill", refresh: 500, timeout: 5000, - retry: { - count: 7200, - delay: 500 - } - } - + retry: { + count: 7200, + delay: 500 + } + }, + functions: ["org.apache.drill.expr.fn.impl"], network: { start: 35000 + }, + work: { + max.width.per.endpoint: 5, + global.max.width: 100, + executor.threads: 4 } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 42abf54..72776d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -24,17 +24,23 @@ public interface ExecConstants { public static final String ZK_TIMEOUT = "drill.exec.zk.timeout"; public static final String ZK_ROOT = "drill.exec.zk.root"; public static final String ZK_REFRESH = "drill.exec.zk.refresh"; - public static final String BIT_RETRY_TIMES = "drill.exec.bit.retry.count"; - public static final String BIT_RETRY_DELAY = "drill.exec.bit.retry.delay"; + public static final String BIT_RETRY_TIMES = "drill.exec.rpc.bit.server.retry.count"; + public static final String BIT_RETRY_DELAY = "drill.exec.rpc.bit.server.retry.delay"; public static final String BIT_TIMEOUT = "drill.exec.bit.timeout" ; public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.exec.storage.packages"; public static final String SERVICE_NAME = "drill.exec.cluster-id"; - public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.port"; - public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.port"; + public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port"; + public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.server.port"; public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context"; public static final String FUNCTION_PACKAGES = "drill.exec.functions"; public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip"; public static final String METRICS_JMX_OUTPUT_ENABLED = "drill.exec.metrics.jmx.enabled"; public static final String METRICS_LOG_OUTPUT_ENABLED = "drill.exec.metrics.log.enabled"; public static final String METRICS_LOG_OUTPUT_INTERVAL = "drill.exec.metrics.log.interval"; + public static final String GLOBAL_MAX_WIDTH = "drill.exec.work.global.max.width"; + public static final String MAX_WIDTH_PER_ENDPOINT = "drill.exec.work.max.width.per.endpoint"; + public static final String EXECUTOR_THREADS = "drill.exec.work.executor.threads"; + public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads"; + public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads"; + public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads"; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 1c05e14..3dadb0c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -106,7 +106,7 @@ public class DrillClient implements Closeable{ checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found"); // just use the first endpoint for now DrillbitEndpoint endpoint = endpoints.iterator().next(); - this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("Client-"))); + this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), new NamedThreadFactory("Client-"))); logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); connect(endpoint); connected = true; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java index 78c455d..dff2dd3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.planner; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -46,8 +47,11 @@ public class SimpleExecPlanner implements ExecPlanner{ // generate a planning set and collect stats. PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot); + + int maxWidthPerEndpoint = context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT); - return parallelizer.getFragments(context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(), context.getPlanReader(), fragmentRoot, planningSet, maxWidth); + return parallelizer.getFragments(context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(), + context.getPlanReader(), fragmentRoot, planningSet, maxWidth, maxWidthPerEndpoint); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index fd1984c..30a3d5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -60,12 +60,13 @@ public class SimpleParallelizer { * @param globalMaxWidth The maximum level or parallelization any stage of the query can do. Note that while this * might be the number of active Drillbits, realistically, this could be well beyond that * number of we want to do things like speed results return. + * @param maxWidthPerEndpoint Limits the maximum level of parallelization to this factor time the number of Drillbits * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes. * @throws ExecutionSetupException */ public QueryWorkUnit getFragments(DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, - int globalMaxWidth) throws ExecutionSetupException { - assignEndpoints(activeEndpoints, planningSet, globalMaxWidth); + int globalMaxWidth, int maxWidthPerEndpoint) throws ExecutionSetupException { + assignEndpoints(activeEndpoints, planningSet, globalMaxWidth, maxWidthPerEndpoint); return generateWorkUnit(foremanNode, queryId, reader, rootNode, planningSet); } @@ -142,7 +143,7 @@ public class SimpleParallelizer { } private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet, - int globalMaxWidth) throws PhysicalOperatorSetupException { + int globalMaxWidth, int maxWidthPerEndpoint) throws PhysicalOperatorSetupException { // First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on // cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this // could be based on endpoint load) @@ -161,6 +162,8 @@ public class SimpleParallelizer { width = (int) diskCost; } + width = Math.min(width, maxWidthPerEndpoint*allNodes.size()); + if (width < 1) width = 1; // logger.debug("Setting width {} on fragment {}", width, wrapper); wrapper.setWidth(width); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index 8b0cc91..8c1487c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -152,7 +152,6 @@ public class Wrapper { endpoints.add(all.get(i % div)); } } else { - width = Math.min(width, values.size()*5); // get nodes with highest affinity. Collections.sort(values); values = Lists.reverse(values); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java index 1b10ef0..12f42c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java @@ -39,7 +39,7 @@ public class BootStrapContext implements Closeable{ public BootStrapContext(DrillConfig config) { super(); this.config = config; - this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-")); + this.loop = new NioEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), new NamedThreadFactory("BitServer-")); this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME)); this.allocator = BufferAllocator.getAllocator(config); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java index bc0d171..8406deb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java @@ -47,7 +47,8 @@ public class ServiceEngine implements Closeable{ boolean useIP = false; public ServiceEngine(BitComHandler bitComWorker, UserWorker userWorker, BootStrapContext context){ - this.userServer = new UserServer(context.getAllocator().getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), userWorker); + this.userServer = new UserServer(context.getAllocator().getUnderlyingAllocator(), new NioEventLoopGroup(context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), + new NamedThreadFactory("UserServer-")), userWorker); this.bitCom = new BitComImpl(context, bitComWorker); this.config = context.getConfig(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index ab9c16b..9a82c62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -63,7 +64,7 @@ public class WorkManager implements Closeable{ private final BitComHandler bitComWorker; private final UserWorker userWorker; private final WorkerBee bee; - private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("WorkManager-")); + private Executor executor; private final EventThread eventThread; public WorkManager(BootStrapContext context){ @@ -72,11 +73,11 @@ public class WorkManager implements Closeable{ this.bitComWorker = new BitComHandlerImpl(bee); this.userWorker = new UserWorker(bee); this.eventThread = new EventThread(); - } public void start(DrillbitEndpoint endpoint, DistributedCache cache, BitCom com, ClusterCoordinator coord){ this.dContext = new DrillbitContext(endpoint, bContext, coord, com, cache); + executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS), new NamedThreadFactory("WorkManager-")); eventThread.start(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 5e3302e..726539a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.LogicalPlan; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.ops.QueryContext; @@ -209,7 +210,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ SimpleParallelizer parallelizer = new SimpleParallelizer(); try { - QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, 10); + QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), + context.getPlanReader(), rootFragment, planningSet, context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH), + context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT)); this.context.getBitCom().getListeners().addFragmentStatusListener(work.getRootFragment().getHandle(), fragmentManager); List<PlanFragment> leafFragments = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 41282e5..7e40a05 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -7,8 +7,25 @@ drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl" drill.exec: { cluster-id: "drillbits1" rpc: { - user.port : 31010, - bit.port : 31011, + user: { + server: { + port: 31010 + threads: 1 + } + client: { + threads: 1 + } + }, + bit: { + server: { + port : 31011, + retry:{ + count: 7200, + delay: 500 + }, + threads: 1 + } + }, use.ip : false }, operator: { @@ -20,7 +37,7 @@ drill.exec: { functions: ["org.apache.drill.expr.fn.impl"], storage: { packages += "org.apache.drill.exec.store" - } + }, metrics : { context: "drillbit", jmx: { @@ -42,13 +59,12 @@ drill.exec: { } }, functions: ["org.apache.drill.expr.fn.impl"], - bit: { - retry:{ - count: 7200, - delay: 500 - } - } , network: { start: 35000 + }, + work: { + max.width.per.endpoint: 5, + global.max.width: 100, + executor.threads: 4 } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java index 7bb90ee..6e0bfa3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java @@ -61,7 +61,7 @@ public class TestFragmentChecker extends PopUnitTestBase{ } - QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10); + QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5); System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId())); System.out.print(qwu.getRootFragment().getFragmentJson()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/627c84e1/exec/java-exec/src/test/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/drill-module.conf b/exec/java-exec/src/test/resources/drill-module.conf index 744b786..99dd863 100644 --- a/exec/java-exec/src/test/resources/drill-module.conf +++ b/exec/java-exec/src/test/resources/drill-module.conf @@ -1,11 +1,32 @@ -// This file tells Drill to consider this module when class path scanning. -// This file can also include any supplementary configuration information. +// This file tells Drill to consider this module when class path scanning. +// This file can also include any supplementary configuration information. // This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl" + drill.exec: { cluster-id: "drillbits1" rpc: { - user.port : 31010, - bit.port : 32010 + user: { + server: { + port: 31010 + threads: 1 + } + client: { + threads: 1 + } + }, + bit: { + server: { + port : 31011, + retry:{ + count: 7200, + delay: 500 + }, + threads: 1 + } + }, + use.ip : false }, operator: { packages += "org.apache.drill.exec.physical.config" @@ -13,30 +34,30 @@ drill.exec: { optimizer: { implementation: "org.apache.drill.exec.opt.IdentityOptimizer" }, + functions: ["org.apache.drill.expr.fn.impl"], storage: { - packages += "org.apache.drill.exec.store" - } - metrics : { - context: "drillbit" + packages += "org.apache.drill.exec.store" + }, + metrics : { + context: "drillbit" }, zk: { - connect: "10.10.30.52:5181", + connect: "localhost:2181", root: "/drill", refresh: 500, timeout: 5000, - retry: { - count: 7200, - delay: 500 - } - } - bit: { - retry:{ - count: 7200, - delay: 500 - } - } , - + retry: { + count: 7200, + delay: 500 + } + }, + functions: ["org.apache.drill.expr.fn.impl"], network: { start: 35000 + }, + work: { + max.width.per.endpoint: 5, + global.max.width: 100, + executor.threads: 4 } }
