This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 33b9d41  [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline 
enumeration, II
33b9d41 is described below

commit 33b9d41d7689b1ff0ab02fad060593c0bef5a5c1
Author: Matthias Boehm <[email protected]>
AuthorDate: Sun Apr 3 01:27:56 2022 +0200

    [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, II
    
    This patch makes additional fixes for cleaning pipeline enumeration in
    remote parfor loops.
    
    * Proper export of modified frames/list result variables
    * Fix bandit builtin to avoid unnecessary list result variables
    * Fix special cases of eval function loading (rt program duplicate funs)
    * Fix aggregate binary typos in method names
    * Fix ultra-sparse matrix multiplication selection
---
 scripts/builtin/bandit.dml                         |  8 +-
 .../org/apache/sysds/parser/DMLTranslator.java     |  6 +-
 .../sysds/parser/FunctionCallIdentifier.java       |  3 +-
 .../org/apache/sysds/parser/StatementBlock.java    |  7 +-
 .../apache/sysds/runtime/codegen/CodegenUtils.java |  1 -
 .../federated/MatrixLineagePair.java               |  1 -
 .../controlprogram/parfor/RemoteParForUtils.java   | 89 +++++-----------------
 .../cp/AggregateBinaryCPInstruction.java           |  4 +-
 .../instructions/cp/EvalNaryCPInstruction.java     | 11 ++-
 .../sysds/runtime/matrix/data/LibMatrixMult.java   | 10 +--
 10 files changed, 49 insertions(+), 91 deletions(-)

diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index dd5b5d8..d017556 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -239,7 +239,8 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, 
Integer r_i, Matrix[Double
     [hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, 
param, r_i, default, enablePruning)
     hpForPruning = matrix(0, rows=1, cols=ncol(op))
     changesByOp = matrix(0, rows=1, cols=ncol(op))
-    metaList["applyFunc"] = applyFunctions
+    metaList2 = metaList; #ensure metaList is no result var
+    metaList2["applyFunc"] = applyFunctions
     for(r in 1:no_of_res)
     {
       # as the matrix first block of r rows belongs to first operator and r+1 
block of rows to second operator 
@@ -260,13 +261,13 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, 
Integer r_i, Matrix[Double
         {
           pipList = list(ph = op, hp = hp_matrix, flags = no_of_flag_vars)
           [accuracy, evalHp, hpForPruning, changesByOp, changesByPip] = 
crossV(X=X, y=Y, cvk=cvk, evalFunHp=evalFunHp,
-            pipList=pipList, metaList=metaList, hpForPruning=hpForPruning, 
+            pipList=pipList, metaList=metaList2, hpForPruning=hpForPruning, 
           changesByOp=changesByOp, evalFunc=evaluationFunc, ref=ref)
         }
         else 
         {
           [eXtrain, eYtrain, eXtest, eYtest, Tr, hpForPruning, changesByOp, 
changesByPip] = executePipeline(pipeline=op, 
-            Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList,  
hyperParameters=hp_matrix, hpForPruning=hpForPruning,
+            Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList2,  
hyperParameters=hp_matrix, hpForPruning=hpForPruning,
             changesByOp=changesByOp, flagsCount=no_of_flag_vars, test=TRUE, 
verbose=FALSE)
           if(max(eYtrain) == min(eYtrain)) 
             print("Y contains only one class")
@@ -299,7 +300,6 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, 
Integer r_i, Matrix[Double
   output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), 
margin="rows", select = sel)
   output_operator = removeEmpty(target=cbind(output_accuracy, 
output_pipelines), margin="rows", select = sel)
   changesByPipMatrix = removeEmpty(target=changesByPipMatrix, margin="rows", 
select = sel)
-
 }
 
 # extract the hyper-parameters for pipelines
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index ef51904..8383318 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -152,6 +152,10 @@ public class DMLTranslator
        }
        
        public void validateFunction(DMLProgram dmlp, FunctionStatementBlock 
fsb) {
+               validateFunction(dmlp, fsb, false);
+       }
+       
+       public void validateFunction(DMLProgram dmlp, FunctionStatementBlock 
fsb, boolean conditional) {
                HashMap<String, ConstIdentifier> constVars = new HashMap<>();
                VariableSet vs = new VariableSet();
        
@@ -162,7 +166,7 @@ public class DMLTranslator
                                currVar.setDimensions(0, 0);
                        vs.addVariable(currVar.getName(), currVar);
                }
-               fsb.validate(dmlp, vs, constVars, false);
+               fsb.validate(dmlp, vs, constVars, conditional);
        }
 
        public void liveVariableAnalysis(DMLProgram dmlp) {
diff --git a/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java 
b/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
index 173a17e..f60047f 100644
--- a/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
+++ b/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
@@ -139,7 +139,8 @@ public class FunctionCallIdentifier extends DataIdentifier
                        fblock = dmlp.getFunctionStatementBlock(_namespace, 
_name);
                        if( fblock == null ) {
                                raiseValidateError("Builtin function '"+_name+ 
"': script loaded "
-                                       + "but function not found. Is there a 
typo in the function name?");
+                                       + "but function not found. Is there a 
typo in the function name?", conditional);
+                               return; //robustness on warnings (conditional)
                        }
                }
                
diff --git a/src/main/java/org/apache/sysds/parser/StatementBlock.java 
b/src/main/java/org/apache/sysds/parser/StatementBlock.java
index 1604a28..ca78220 100644
--- a/src/main/java/org/apache/sysds/parser/StatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/StatementBlock.java
@@ -1057,12 +1057,13 @@ public class StatementBlock extends 
LiveVariableAnalysis implements ParseInfo
                                DataIdentifier target = targetList.get(j);
                                // set target properties (based on type info in 
function call statement return params)
                                FunctionCallIdentifier fci = 
(FunctionCallIdentifier)source;
-                               FunctionStatement fstmt = 
(FunctionStatement)_dmlProg
-                                       
.getFunctionStatementBlock(fci.getNamespace(), fci.getName()).getStatement(0);
-                               if (fstmt == null){
+                               FunctionStatementBlock fblock = 
_dmlProg.getFunctionStatementBlock(fci.getNamespace(), fci.getName());
+                               if (fblock == null){
                                        fci.raiseValidateError(" function " + 
fci.getName() 
                                                + " is undefined in namespace " 
+ fci.getNamespace(), conditional);
+                                       return;
                                }
+                               FunctionStatement fstmt = 
(FunctionStatement)fblock.getStatement(0);
                                if (!(target instanceof IndexedIdentifier)){
                                        
target.setProperties(fstmt.getOutputParams().get(j));
                                }
diff --git a/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java 
b/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
index 90f7089..898d01d 100644
--- a/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
@@ -194,7 +194,6 @@ public class CodegenUtils
        private synchronized static Class<?> compileClassJanino(String name, 
String src) {
                try {
                        // compile source code
-                       // (in recent spark versions )
                        SimpleCompiler compiler = new SimpleCompiler();
                        if( _mainClassLoader != null )
                                compiler.setParentClassLoader(_mainClassLoader);
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
index a6d73cd..0e1380e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.hops.fedplanner.FTypes.FType;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
index 073d4a2..4a5a898 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -26,14 +26,11 @@ import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysds.runtime.DMLRuntimeException;
@@ -46,6 +43,9 @@ import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Stat;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.io.FileFormatProperties;
+import org.apache.sysds.runtime.io.ListWriter;
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageParser;
@@ -97,62 +97,6 @@ public class RemoteParForUtils
                }
        }
 
-       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<ResultVar> resultVars, 
OutputCollector<Writable, Writable> out ) throws IOException {
-               exportResultVariables(workerID, vars, resultVars, null, out);
-       }
-       
-       /**
-        * For remote MR parfor workers.
-        * 
-        * @param workerID worker id
-        * @param vars local variable map
-        * @param resultVars list of result variables
-        * @param rvarFnames ?
-        * @param out output collectors
-        * @throws IOException if IOException occurs
-        */
-       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<ResultVar> resultVars, 
-                       HashMap<String,String> rvarFnames, 
OutputCollector<Writable, Writable> out ) throws IOException
-       {
-               //create key and value for reuse
-               LongWritable okey = new LongWritable( workerID ); 
-               Text ovalue = new Text();
-               
-               //foreach result variables probe if export necessary
-               for( ResultVar rvar : resultVars )
-               {
-                       Data dat = vars.get( rvar._name );
-                       
-                       //export output variable to HDFS (see RunMRJobs)
-                       if ( dat != null && dat.getDataType() == 
DataType.MATRIX ) 
-                       {
-                               MatrixObject mo = (MatrixObject) dat;
-                               if( mo.isDirty() )
-                               {
-                                       if( rvarFnames!=null ) {
-                                               String fname = rvarFnames.get( 
rvar._name );
-                                               if( fname!=null )
-                                                       mo.setFileName( fname );
-                                                       
-                                               //export result var (iff 
actually modified in parfor)
-                                               mo.exportData(); //note: this 
is equivalent to doing it in close (currently not required because 1 Task=1Map 
tasks, hence only one map invocation)              
-                                               rvarFnames.put(rvar._name, 
mo.getFileName());
-                                       }
-                                       else {
-                                               //export result var (iff 
actually modified in parfor)
-                                               mo.exportData(); //note: this 
is equivalent to doing it in close (currently not required because 1 Task=1Map 
tasks, hence only one map invocation)
-                                       }
-                                       
-                                       //pass output vars (scalars by value, 
matrix by ref) to result
-                                       //(only if actually exported, hence in 
check for dirty, otherwise potential problems in result merge)
-                                       String datStr = 
ProgramConverter.serializeDataObject(rvar._name, mo);
-                                       ovalue.set( datStr );
-                                       out.collect( okey, ovalue );
-                               }
-                       }
-               }
-       }
-       
        /**
         * For remote Spark parfor workers. This is a simplified version 
compared to MR.
         * 
@@ -170,18 +114,23 @@ public class RemoteParForUtils
                //foreach result variables probe if export necessary
                for( ResultVar rvar : resultVars ) {
                        Data dat = vars.get( rvar._name );
-                       //export output variable to HDFS (see RunMRJobs)
-                       if ( dat != null && dat.getDataType() == 
DataType.MATRIX )  {
-                               MatrixObject mo = (MatrixObject) dat;
-                               if( mo.isDirty() ) {
-                                       //export result var (iff actually 
modified in parfor)
-                                       mo.exportData(); 
-                                       //pass output vars (scalars by value, 
matrix by ref) to result
-                                       //(only if actually exported, hence in 
check for dirty, otherwise potential problems in result merge)
-                                       ret.add( 
ProgramConverter.serializeDataObject(rvar._name, mo) );
+                       
+                       if ( dat != null && dat.getDataType().isMatrixOrFrame() 
) {
+                               CacheableData<?> cd = (CacheableData<?>) dat;
+                               //export result var (iff actually modified in 
parfor)
+                               if( cd.isDirty() ) {
+                                       cd.exportData();
+                                       //pass output vars to result (only if 
actually exported)
+                                       ret.add( 
ProgramConverter.serializeDataObject(rvar._name, dat) );
                                }
                                //cleanup pinned result variable from buffer 
pool
-                               mo.freeEvictedBlob();
+                               cd.freeEvictedBlob();
+                       }
+                       else if (dat instanceof ListObject) {
+                               String fname = 
OptimizerUtils.getUniqueTempFileName();
+                               ListWriter.writeListToHDFS((ListObject) dat, 
fname, "binary",
+                                       new 
FileFormatProperties(ConfigurationManager.getBlocksize()));
+                               ret.add( 
ProgramConverter.serializeDataObject(rvar._name, dat) );
                        }
                }
                
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
index 60dfdd1..d421c21 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
@@ -82,10 +82,10 @@ public class AggregateBinaryCPInstruction extends 
BinaryCPInstruction {
                else if(transposeLeft || transposeRight)
                        processTransposedFusedAggregateBinary(ec);
                else
-                       precessNormal(ec);
+                       processNormal(ec);
        }
 
-       private void precessNormal(ExecutionContext ec) {
+       private void processNormal(ExecutionContext ec) {
                // get inputs
                MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
                MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index 46fbeeb..fc6132f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -218,6 +218,7 @@ public class EvalNaryCPInstruction extends 
BuiltinNaryCPInstruction {
                        fsbs.get(Builtins.getInternalFName(name, 
dt)).getDMLProg();
                
                //filter already existing functions (e.g., already loaded 
internally-called functions)
+               //note: in remote parfor the runtime program might contain more 
functions than the DML program
                fsbs = (dmlp.getBuiltinFunctionDictionary() == null) ? fsbs : 
fsbs.entrySet().stream()
                        .filter(e -> 
!dmlp.getBuiltinFunctionDictionary().containsFunction(e.getKey()))
                        .collect(Collectors.toMap(e -> e.getKey(), e -> 
e.getValue()));
@@ -237,7 +238,9 @@ public class EvalNaryCPInstruction extends 
BuiltinNaryCPInstruction {
                // validate functions, in two passes for cross references
                for( FunctionStatementBlock fsb : fsbs.values() ) {
                        dmlt.liveVariableAnalysisFunction(dmlp, fsb);
-                       dmlt.validateFunction(dmlp, fsb);
+                       //mark as conditional (warnings instead of errors) 
because internally
+                       //called functions might not be available in dmlp but 
prog in remote parfor
+                       dmlt.validateFunction(dmlp, fsb, true);
                }
                
                // compile hop dags, rewrite hop dags and compile lop dags
@@ -260,8 +263,10 @@ public class EvalNaryCPInstruction extends 
BuiltinNaryCPInstruction {
                for( Entry<String,FunctionStatementBlock> fsb : fsbs.entrySet() 
) {
                        FunctionProgramBlock fpb = (FunctionProgramBlock) dmlt
                                .createRuntimeProgramBlock(prog, 
fsb.getValue(), ConfigurationManager.getDMLConfig());
-                       prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, 
true);  // optimized
-                       prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, 
false); // unoptimized -> eval
+                       if(!prog.containsFunctionProgramBlock(nsName, 
fsb.getKey(), true))
+                               prog.addFunctionProgramBlock(nsName, 
fsb.getKey(), fpb, true);  // optimized
+                       if(!prog.containsFunctionProgramBlock(nsName, 
fsb.getKey(), false))
+                               prog.addFunctionProgramBlock(nsName, 
fsb.getKey(), fpb, false); // unoptimized -> eval
                }
        }
        
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
index c619c61..0e003ca 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
@@ -181,7 +181,7 @@ public class LibMatrixMult
                        k = 1;
 
                if(k <= 1)
-                       singleThreadMatrixMult(m1, m2, ret, ultraSparse, 
sparse, tm2, m1Perm, fixedRet);
+                       singleThreadedMatrixMult(m1, m2, ret, ultraSparse, 
sparse, tm2, m1Perm, fixedRet);
                else
                        parallelMatrixMult(m1, m2, ret, k, ultraSparse, sparse, 
tm2, m1Perm);
 
@@ -191,7 +191,7 @@ public class LibMatrixMult
                return ret;
        }
 
-       private static void singleThreadMatrixMult(MatrixBlock m1, MatrixBlock 
m2, MatrixBlock ret,  
+       private static void singleThreadedMatrixMult(MatrixBlock m1, 
MatrixBlock m2, MatrixBlock ret,  
                boolean ultraSparse, boolean sparse, boolean tm2, boolean 
m1Perm, boolean fixedRet){
                // prepare row-upper for special cases of vector-matrix
                final boolean pm2 = !ultraSparse && 
checkParMatrixMultRightInputRows(m1, m2, Integer.MAX_VALUE);
@@ -3967,7 +3967,8 @@ public class LibMatrixMult
                        || (m1Perm && OptimizerUtils.getSparsity(m2.rlen, 
m2.clen, m2.nonZeros)<1.0)
                        || ((m1.isUltraSparse(false) || 
m2.isUltraSparse(false)) 
                                && outSp < 
MatrixBlock.ULTRA_SPARSITY_TURN_POINT2)
-                       || (m1.getSparsity() < 
MatrixBlock.ULTRA_SPARSITY_TURN_POINT2
+                       || (m1.isInSparseFormat() // otherwise no matching 
branch
+                               && m1.getSparsity() < 
MatrixBlock.ULTRA_SPARSITY_TURN_POINT2
                                && m1.getNonZeros() < 
MatrixBlock.ULTRA_SPARSE_BLOCK_NNZ
                                && m1.getLength()+m2.getLength() < 
(long)m1.rlen*m2.clen
                                && outSp < MatrixBlock.SPARSITY_TURN_POINT);
@@ -3994,8 +3995,7 @@ public class LibMatrixMult
                //transpose if dense-dense, skinny rhs matrix (not vector), and 
memory guarded by output 
                if( tm2 ) {
                        MatrixBlock tmpBlock = new MatrixBlock(m2.clen, 
m2.rlen, m2.sparse);
-                       LibMatrixReorg.reorg(m2, tmpBlock, new 
ReorgOperator(SwapIndex.getSwapIndexFnObject()));
-                       ret = tmpBlock;
+                       ret = LibMatrixReorg.reorg(m2, tmpBlock, new 
ReorgOperator(SwapIndex.getSwapIndexFnObject()));
                }
                
                return ret;

Reply via email to