This is an automated email from the ASF dual-hosted git repository. sunithabeeram pushed a commit to branch planningPhase in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 061fa3ca98f4e691b5069ca5e87ba1ec8f53833c Author: Sunitha Beeram <[email protected]> AuthorDate: Mon Nov 12 22:49:53 2018 -0800 [PINOT-7328] Reduce lock contention in physical planning phase by reducing the total number of tasks --- .../linkedin/pinot/core/plan/CombinePlanNode.java | 22 ++++++++++------ .../pinot/core/plan/CombinePlanNodeTest.java | 30 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java index 72f04c1..bd03a94 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/plan/CombinePlanNode.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.Op; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory; public class CombinePlanNode implements PlanNode { private static final Logger LOGGER = LoggerFactory.getLogger(CombinePlanNode.class); - private static final int NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN = 10; + private static final int MAX_PLAN_TASKS = Math.min(10, (int) (Runtime.getRuntime().availableProcessors() * .5)); private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000; private final List<PlanNode> _planNodes; @@ -67,7 +68,7 @@ public class CombinePlanNode implements PlanNode { int numPlanNodes = _planNodes.size(); List<Operator> operators = new ArrayList<>(numPlanNodes); - if (numPlanNodes < NUM_PLAN_NODES_THRESHOLD_FOR_PARALLEL_RUN) { + if (numPlanNodes < MAX_PLAN_TASKS) { // Small number of plan nodes, run them sequentially for (PlanNode planNode : _planNodes) { operators.add(planNode.run()); @@ -79,13 +80,17 @@ public class CombinePlanNode implements PlanNode { long endTime = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN; // Submit all jobs - Future[] futures = new Future[numPlanNodes]; - for (int i = 0; i < numPlanNodes; i++) { + Future[] futures = new Future[MAX_PLAN_TASKS]; + for (int i = 0; i < MAX_PLAN_TASKS; i++) { final int index = i; - futures[i] = _executorService.submit(new TraceCallable<Operator>() { + futures[i] = _executorService.submit(new TraceCallable<List<Operator>>() { @Override - public Operator callJob() throws Exception { - return _planNodes.get(index).run(); + public List<Operator> callJob() throws Exception { + List<Operator> operators = new ArrayList<>(); + for(int count = index; count < numPlanNodes; count = count + MAX_PLAN_TASKS) { + operators.add(_planNodes.get(count).run()); + } + return operators; } }); } @@ -93,7 +98,8 @@ public class CombinePlanNode implements PlanNode { // Get all results try { for (Future future : futures) { - operators.add((Operator) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); + List<Operator> ops = (List<Operator>) future.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + operators.addAll(ops); } } catch (Exception e) { // Future object will throw ExecutionException for execution exception, need to check the cause to determine diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java index b27dc94..b79c825 100644 --- a/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java +++ b/pinot-core/src/test/java/com/linkedin/pinot/core/plan/CombinePlanNodeTest.java @@ -15,12 +15,14 @@ */ package com.linkedin.pinot.core.plan; +import com.linkedin.pinot.common.request.BrokerRequest; import com.linkedin.pinot.core.common.Operator; import com.linkedin.pinot.core.plan.maker.InstancePlanMakerImplV2; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; import org.testng.annotations.Test; @@ -28,6 +30,34 @@ import org.testng.annotations.Test; public class CombinePlanNodeTest { private ExecutorService _executorService = Executors.newFixedThreadPool(10); + /** + * Tests that the tasks are executed as expected in parallel mode. + */ + @Test + public void testParallelExecution() { + AtomicInteger count = new AtomicInteger(0); + int numPlans = 42; + List<PlanNode> planNodes = new ArrayList<>(); + for (int i = 0; i < numPlans; i++) { + planNodes.add(new PlanNode() { + @Override + public Operator run() { + count.incrementAndGet(); + return null; + } + + @Override + public void showTree(String prefix) { + } + }); + } + CombinePlanNode combinePlanNode = + new CombinePlanNode(planNodes, new BrokerRequest(), _executorService, 1000, + InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT); + combinePlanNode.run(); + Assert.assertEquals(numPlans, count.get()); + } + @Test public void testSlowPlanNode() { // Warning: this test is slow (take 10 seconds). --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
