This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit e03ef9691d111eca82e8c8bdb0373cd40bdc361e Author: Matthias Boehm <[email protected]> AuthorDate: Sat Sep 4 22:19:24 2021 +0200 [SYSTEMDS-3122] Fix parfor degree of parallelism w/ eval functions Assume the following special-case (but increasingly common) scenario of three functions fun1, fun2, fun3, where fun1 might be, for example, hyper-parameter tuning with unknown models/functions. There was an issue where the parfor optimizer set the degree of parallelism to 112, and then tried to set all reachable program blocks and functions to a DOP 1. However, because it encounters an eval with unknown function call, it recompiled all existing functions (including fun1) to DOP 1 and thus, destroyed its own optimization decisions. This patch now properly fixes these decisions (for a tree of nested parfor) when recompiling eval functions. function fun1() parfor(i in 1:n) eval("fun2", X, y) function fun2() fun3() function fun3() X = X + 1 On the topk-cleaning pipeline enumeration until the hyper-parameter tuning for dirty baseline accuracy, this patch improved the end-to-end runtime from 51s to 11s. --- .../sysds/runtime/controlprogram/ParForProgramBlock.java | 11 ++++++++++- .../runtime/controlprogram/paramserv/ParamservUtils.java | 6 ++++-- .../runtime/controlprogram/parfor/opt/OptimizerRuleBased.java | 11 +++++++++-- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java index 42ab8bc..a289218 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java @@ -320,6 +320,7 @@ public class ParForProgramBlock extends ForProgramBlock protected final boolean _monitor; protected final Level _optLogLevel; protected int _numThreads = -1; + protected boolean _fixedDOP = false; //guard for numThreads protected long _taskSize = -1; protected PTaskPartitioner _taskPartitioner = null; protected PDataPartitioner _dataPartitioner = null; @@ -471,6 +472,14 @@ public class ParForProgramBlock extends ForProgramBlock _params.put(ParForStatementBlock.PAR, String.valueOf(_numThreads)); //kept up-to-date for copies setLocalParWorkerIDs(); } + + public boolean isDegreeOfParallelismFixed() { + return _fixedDOP; + } + + public void setDegreeOfParallelismFixed(boolean flag) { + _fixedDOP = flag; + } public void setCPCaching(boolean flag) { _enableCPCaching = flag; @@ -1187,7 +1196,7 @@ public class ParForProgramBlock extends ForProgramBlock try { //create deep copies of required elements child blocks - ArrayList<ProgramBlock> cpChildBlocks = null; + ArrayList<ProgramBlock> cpChildBlocks = null; HashSet<String> fnNames = new HashSet<>(); if( USE_PB_CACHE ) { diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java index da1e9f7..b25c7df 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java @@ -309,8 +309,10 @@ public class ParamservUtils { for (ProgramBlock pb : pbs) { if (pb instanceof ParForProgramBlock) { ParForProgramBlock pfpb = (ParForProgramBlock) pb; - pfpb.setDegreeOfParallelism(k); - recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP); + if( !pfpb.isDegreeOfParallelismFixed() ) { + pfpb.setDegreeOfParallelism(k); + recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP); + } } else if (pb instanceof ForProgramBlock) { recompiled |= rAssignParallelismAndRecompile(((ForProgramBlock) pb).getChildBlocks(), k, recompiled, forceExecTypeCP); } else if (pb instanceof WhileProgramBlock) { diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index cf2c091..b2c82c1 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -1178,12 +1178,14 @@ public class OptimizerRuleBased extends Optimizer { //set parfor degree of parallelism pfpb.setDegreeOfParallelism(parforK); + pfpb.setDegreeOfParallelismFixed(true); n.setK(parforK); //distribute remaining parallelism int remainParforK = getRemainingParallelismParFor(kMax, parforK); int remainOpsK = getRemainingParallelismOps(_lkmaxCP, parforK); rAssignRemainingParallelism( n, remainParforK, remainOpsK ); + pfpb.setDegreeOfParallelismFixed(false); } else // ExecType.MR/ExecType.SPARK { @@ -1212,7 +1214,9 @@ public class OptimizerRuleBased extends Optimizer { kMax = 1; //distribute remaining parallelism and recompile parallel instructions + pfpb.setDegreeOfParallelismFixed(true); rAssignRemainingParallelism( n, kMax, 1 ); + pfpb.setDegreeOfParallelismFixed(false); } _numEvaluatedPlans++; @@ -1247,14 +1251,15 @@ public class OptimizerRuleBased extends Optimizer { //set parfor degree of parallelism long id = c.getID(); c.setK(tmpK); - ParForProgramBlock pfpb = (ParForProgramBlock) - _plan.getMappedProgramBlock(id); + ParForProgramBlock pfpb = (ParForProgramBlock) _plan.getMappedProgramBlock(id); pfpb.setDegreeOfParallelism(tmpK); //distribute remaining parallelism int remainParforK = getRemainingParallelismParFor(parforK, tmpK); int remainOpsK = getRemainingParallelismOps(opsK, tmpK); + pfpb.setDegreeOfParallelismFixed(true); rAssignRemainingParallelism(c, remainParforK, remainOpsK); + pfpb.setDegreeOfParallelismFixed(false); } else if( c.getNodeType() == NodeType.HOP ) { @@ -1278,6 +1283,8 @@ public class OptimizerRuleBased extends Optimizer { } //if parfor contains eval call, make unoptimized functions single-threaded + //(parent parfor program blocks have been frozen such that the following + //recompilation of all possible functions does not reset the DOP to 1) if( HopRewriteUtils.isNary(h, OpOpN.EVAL) ) { ProgramBlock pb = _plan.getMappedProgramBlock(n.getID()); pb.getProgram().getFunctionProgramBlocks(false)
