This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new b3511d2  [SYSTEMDS-2576] Fix dop unoptimized functions 
(parfor-eval/parmserv)
b3511d2 is described below

commit b3511d216524a60674daae211806186ffcf371c8
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Jul 24 15:34:50 2020 +0200

    [SYSTEMDS-2576] Fix dop unoptimized functions (parfor-eval/parmserv)
    
    This patch fixes the degree of parallelism of unoptimized functions as
    called in second-order functions such as eval and paramserv to avoid
    excessive CPU over-commitment. For example, on a box with 32 threads,
    paramserv would run 32 local workers and each worker use 32 threads for
    individual operations (1024 threads total).
---
 .../controlprogram/paramserv/ParamservUtils.java   |  7 +++--
 .../parfor/opt/OptTreePlanMappingAbstract.java     |  4 +++
 .../parfor/opt/OptimizerRuleBased.java             | 24 ++++++++++-----
 .../test/functions/misc/FunctionPotpourriTest.java |  7 +++++
 .../misc/FunPotpourriNestedParforEval.dml          | 36 ++++++++++++++++++++++
 5 files changed, 67 insertions(+), 11 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index 699b72f..968cb1d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -218,10 +218,11 @@ public class ParamservUtils {
        {
                Program prog = ec.getProgram();
 
-               // 1. Recompile the internal program blocks
+               // 1. Recompile the internal program blocks 
                recompileProgramBlocks(k, prog.getProgramBlocks());
                // 2. Recompile the imported function blocks
-               prog.getFunctionProgramBlocks().forEach((fname, fvalue) -> 
recompileProgramBlocks(k, fvalue.getChildBlocks()));
+               prog.getFunctionProgramBlocks(false)
+                       .forEach((fname, fvalue) -> recompileProgramBlocks(k, 
fvalue.getChildBlocks()));
 
                // 3. Copy all functions 
                return ExecutionContextFactory.createContext(
@@ -247,7 +248,7 @@ public class ParamservUtils {
                return newProg;
        }
 
-       private static void recompileProgramBlocks(int k, List<ProgramBlock> 
pbs) {
+       public static void recompileProgramBlocks(int k, List<ProgramBlock> 
pbs) {
                // Reset the visit status from root
                for (ProgramBlock pb : pbs)
                        
DMLTranslator.resetHopsDAGVisitStatus(pb.getStatementBlock());
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
index eced04f..956ca12 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
@@ -84,6 +84,10 @@ public class OptTreePlanMappingAbstract extends 
OptTreePlanMapping
                return ret;
        }
        
+       public ProgramBlock getMappedProgramBlock(long id) {
+               return (ProgramBlock) _id_rtprog.get(id);
+       }
+       
        public void replaceMapping( ProgramBlock pb, OptNode n ) {
                long id = n.getID();
                _id_rtprog.put(id, pb);
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 7a6933e..63ae8af 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.OpOpN;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.AggBinaryOp;
 import org.apache.sysds.hops.AggBinaryOp.MMultMethod;
@@ -66,6 +67,7 @@ import 
org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.controlprogram.paramserv.ParamservUtils;
 import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalFile;
 import 
org.apache.sysds.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType;
 import 
org.apache.sysds.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure;
@@ -1247,7 +1249,7 @@ public class OptimizerRuleBased extends Optimizer
                                        long id = c.getID();
                                        c.setK(tmpK);
                                        ParForProgramBlock pfpb = 
(ParForProgramBlock) 
-                                               
OptTreeConverter.getAbstractPlanMapping().getMappedProg(id)[1];
+                                               
OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(id);
                                        pfpb.setDegreeOfParallelism(tmpK);
                                        
                                        //distribute remaining parallelism
@@ -1275,6 +1277,13 @@ public class OptimizerRuleBased extends Optimizer
                                                mhop.setMaxNumThreads(1); //set 
max constraint in hop
                                                c.setK(1); //set optnode k (for 
explain)
                                        }
+                                       
+                                       //if parfor contains eval call, make 
unoptimized functions single-threaded
+                                       if( HopRewriteUtils.isNary(h, 
OpOpN.EVAL) ) {
+                                               ProgramBlock pb = 
OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
+                                               
pb.getProgram().getFunctionProgramBlocks(false)
+                                                       .forEach((fname, 
fvalue) -> ParamservUtils.recompileProgramBlocks(1, fvalue.getChildBlocks()));
+                                       }
                                }
                                else
                                        rAssignRemainingParallelism(c, parforK, 
opsK);
@@ -1284,7 +1293,7 @@ public class OptimizerRuleBased extends Optimizer
                        if( recompileSB ) {
                                try {
                                        //guaranteed to be a last-level block 
(see hop change)
-                                       ProgramBlock pb = (ProgramBlock) 
OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+                                       ProgramBlock pb = 
OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
                                        
Recompiler.recompileProgramBlockInstructions(pb);
                                }
                                catch(Exception ex){
@@ -1356,7 +1365,7 @@ public class OptimizerRuleBased extends Optimizer
                
                // modify rtprog
                ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-                                     
.getAbstractPlanMapping().getMappedProg(id)[1];
+                       .getAbstractPlanMapping().getMappedProgramBlock(id);
                pfpb.setTaskPartitioner(partitioner);
                
                // modify plan
@@ -2403,10 +2412,9 @@ public class OptimizerRuleBased extends Optimizer
        {
                boolean ret = false;
                
-               if( n.getNodeType() == NodeType.PARFOR )
-               {
-                       ParForProgramBlock pfpb = (ParForProgramBlock) 
OptTreeConverter
-                                                               
.getAbstractPlanMapping().getMappedProg(n.getID())[1];  
+               if( n.getNodeType() == NodeType.PARFOR ) {
+                       ProgramBlock pfpb = OptTreeConverter
+                               
.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
                        ret = (parfor == pfpb);
                }
                
@@ -2425,7 +2433,7 @@ public class OptimizerRuleBased extends Optimizer
                if( n.getNodeType()==NodeType.PARFOR )
                {
                        ParForProgramBlock pfpb = (ParForProgramBlock) 
OptTreeConverter
-                                                                       
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+                               
.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
                        pbs.add(pfpb);
                }
                
diff --git 
a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java 
b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
index 9181351..89ec265 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
@@ -48,6 +48,7 @@ public class FunctionPotpourriTest extends AutomatedTestBase
        private final static String TEST_NAME17 = 
"FunPotpourriNamedArgsQuotedAssign";
        private final static String TEST_NAME18 = 
"FunPotpourriMultiReturnBuiltin1";
        private final static String TEST_NAME19 = 
"FunPotpourriMultiReturnBuiltin2";
+       private final static String TEST_NAME20 = 
"FunPotpourriNestedParforEval";
        
        private final static String TEST_DIR = "functions/misc/";
        private final static String TEST_CLASS_DIR = TEST_DIR + 
FunctionPotpourriTest.class.getSimpleName() + "/";
@@ -74,6 +75,7 @@ public class FunctionPotpourriTest extends AutomatedTestBase
                addTestConfiguration( TEST_NAME17, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME17, new String[] { "R" }) );
                addTestConfiguration( TEST_NAME18, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME18, new String[] { "R" }) );
                addTestConfiguration( TEST_NAME19, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME19, new String[] { "R" }) );
+               addTestConfiguration( TEST_NAME20, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME20, new String[] { "R" }) );
        }
 
        @Test
@@ -181,6 +183,11 @@ public class FunctionPotpourriTest extends 
AutomatedTestBase
                runFunctionTest( TEST_NAME19, false );
        }
        
+       @Test
+       public void testFunctionNestedParforEval() {
+               runFunctionTest( TEST_NAME20, false );
+       }
+       
        private void runFunctionTest(String testName, boolean error) {
                TestConfiguration config = getTestConfiguration(testName);
                loadTestConfiguration(config);
diff --git a/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml 
b/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml
new file mode 100644
index 0000000..4dc37cb
--- /dev/null
+++ b/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+foo1 = function(Matrix[Double] A, Matrix[Double] B) return (Matrix[Double] C) {
+  while(FALSE){} # no inlining
+  C = A %*% B + 7;
+}
+
+X1 = matrix(1.1, 100, 100)
+X2 = matrix(1.2, 100, 100)
+f = "foo1";
+
+R = matrix(0, 100, 100)
+parfor(i in 1:nrow(R) )
+  parfor(j in 1:ncol(R) )
+    R[i,j] = sum(eval(f, list(A=X1, B=X2)));
+
+print("out: " + sum(R))

Reply via email to