Repository: systemml Updated Branches: refs/heads/master 100f2d606 -> 1d83cedb7
http://git-wip-us.apache.org/repos/asf/systemml/blob/1d83cedb/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java index afdf8be..8dc86d4 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java @@ -268,7 +268,6 @@ public class ProgramConverter { ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst); - tmpPB.setPredicateResultVar( wpb.getPredicateResultVar() ); tmpPB.setStatementBlock( createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), pid, plain, forceDeepCopy) ); tmpPB.setThreadID(pid); @@ -283,7 +282,6 @@ public class ProgramConverter { ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst); - tmpPB.setPredicateResultVar( ipb.getPredicateResultVar() ); tmpPB.setStatementBlock( createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), pid, plain, forceDeepCopy ) ); tmpPB.setThreadID(pid); @@ -297,7 +295,7 @@ public class ProgramConverter public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, HashSet<String> fnStack, HashSet<String> fnCreated, boolean plain, boolean forceDeepCopy) throws DMLRuntimeException { - ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterablePredicateVars()); + ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); tmpPB.setStatementBlock( createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), pid, plain, forceDeepCopy)); tmpPB.setThreadID(pid); @@ -313,7 +311,7 @@ public class ProgramConverter public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) throws DMLRuntimeException { - ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterablePredicateVars()); + ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); tmpPB.setFromInstructions( fpb.getFromInstructions() ); tmpPB.setToInstructions( fpb.getToInstructions() ); @@ -330,9 +328,9 @@ public class ProgramConverter ParForProgramBlock tmpPB = null; if( IDPrefix == -1 ) //still on master node - tmpPB = new ParForProgramBlock(prog,pfpb.getIterablePredicateVars(),pfpb.getParForParams()); + tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams()); else //child of remote ParWorker at any level - tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterablePredicateVars(),pfpb.getParForParams()); + tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams()); tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) ); tmpPB.setThreadID(pid); @@ -584,7 +582,7 @@ public class ProgramConverter try { - if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) && sb != null //forced deep copy for function recompile && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) ) { @@ -729,8 +727,8 @@ public class ProgramConverter throws DMLRuntimeException { ArrayList<ProgramBlock> pbs = body.getChildBlocks(); - ArrayList<String> rVnames = body.getResultVarNames(); - ExecutionContext ec = body.getEc(); + ArrayList<String> rVnames = body.getResultVarNames(); + ExecutionContext ec = body.getEc(); if( pbs.isEmpty() ) return PARFORBODY_BEGIN + PARFORBODY_END; @@ -788,7 +786,7 @@ public class ProgramConverter sb.append( PARFORBODY_END ); - return sb.toString(); + return sb.toString(); } private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) @@ -797,7 +795,7 @@ public class ProgramConverter //note program contains variables, programblocks and function program blocks //but in order to avoid redundancy, we only serialize function program blocks - HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks(); + HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks(); HashSet<String> cand = new HashSet<String>(); rFindSerializationCandidates(pbs, cand); @@ -812,7 +810,7 @@ public class ProgramConverter if( pb instanceof WhileProgramBlock ) { WhileProgramBlock wpb = (WhileProgramBlock) pb; - rFindSerializationCandidates(wpb.getChildBlocks(), cand ); + rFindSerializationCandidates(wpb.getChildBlocks(), cand ); } else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock ) { @@ -870,7 +868,7 @@ public class ProgramConverter DataType datatype = dat.getDataType(); ValueType valuetype = dat.getValueType(); String value = null; - String[] matrixMetaData = null; + String[] matrixMetaData = null; switch( datatype ) { @@ -1041,24 +1039,6 @@ public class ProgramConverter return sb.toString(); } - private static String serializeStringArray( String[] vars) - { - StringBuilder sb = new StringBuilder(); - int count=0; - for( String s : vars ) - { - if(count>0) - sb.append( ELEMENT_DELIM ); - if( s != null ) - sb.append( s ); - else - sb.append( "null" ); - - count++; - } - return sb.toString(); - } - private static String serializeDataIdentifiers( ArrayList<DataIdentifier> var) { StringBuilder sb = new StringBuilder(); @@ -1157,8 +1137,6 @@ public class ProgramConverter sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) ); sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); - sb.append( wpb.getPredicateResultVar() ); - sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); sb.append( serializeInstructions( wpb.getExitInstructions(), clsMap ) ); sb.append( PARFOR_INST_END ); @@ -1170,7 +1148,7 @@ public class ProgramConverter else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) ) { ForProgramBlock fpb = (ForProgramBlock) pb; - sb.append( serializeStringArray(fpb.getIterablePredicateVars()) ); + sb.append( fpb.getIterVar() ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) ); @@ -1200,7 +1178,7 @@ public class ProgramConverter if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_MR ) throw new DMLRuntimeException( NOT_SUPPORTED_MR_PARFOR ); - sb.append( serializeStringArray(pfpb.getIterablePredicateVars()) ); + sb.append( pfpb.getIterVar() ); sb.append( COMPONENTS_DELIM ); sb.append( serializeStringArrayList( pfpb.getResultVariables()) ); sb.append( COMPONENTS_DELIM ); @@ -1231,9 +1209,7 @@ public class ProgramConverter IfProgramBlock ipb = (IfProgramBlock) pb; sb.append( PARFOR_INST_BEGIN ); sb.append( serializeInstructions(ipb.getPredicate(), clsMap) ); - sb.append( PARFOR_INST_END ); - sb.append( COMPONENTS_DELIM ); - sb.append( ipb.getPredicateResultVar() ); + sb.append( PARFOR_INST_END ); sb.append( COMPONENTS_DELIM ); sb.append( PARFOR_INST_BEGIN ); sb.append( serializeInstructions(ipb.getExitInstructions(), clsMap) ); @@ -1470,11 +1446,8 @@ public class ProgramConverter String lin = in.substring( PARFOR_PB_WHILE.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - //LocalVariableMap vars = parseVariables(st.nextToken()); - //predicate instructions ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); - String var = st.nextToken(); //exit instructions ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); @@ -1483,10 +1456,8 @@ public class ProgramConverter ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); WhileProgramBlock wpb = new WhileProgramBlock(prog,inst); - wpb.setPredicateResultVar(var); wpb.setExitInstructions2(exit); wpb.setChildBlocks(pbs); - //wpb.setVariables(vars); return wpb; } @@ -1497,10 +1468,8 @@ public class ProgramConverter String lin = in.substring( PARFOR_PB_FOR.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - //LocalVariableMap vars = parseVariables(st.nextToken()); - //inputs - String[] iterPredVars = parseStringArray(st.nextToken()); + String iterVar = st.nextToken(); //instructions ArrayList<Instruction> from = parseInstructions(st.nextToken(),id); @@ -1509,17 +1478,16 @@ public class ProgramConverter //exit instructions ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); - + //program blocks ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); - - ForProgramBlock fpb = new ForProgramBlock(prog, iterPredVars); + + ForProgramBlock fpb = new ForProgramBlock(prog, iterVar); fpb.setFromInstructions(from); fpb.setToInstructions(to); fpb.setIncrementInstructions(incr); fpb.setExitInstructions(exit); fpb.setChildBlocks(pbs); - //fpb.setVariables(vars); return fpb; } @@ -1530,10 +1498,8 @@ public class ProgramConverter String lin = in.substring( PARFOR_PB_PARFOR.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - //LocalVariableMap vars = parseVariables(st.nextToken()); - //inputs - String[] iterPredVars = parseStringArray(st.nextToken()); + String iterVar = st.nextToken(); ArrayList<String> resultVars = parseStringArrayList(st.nextToken()); HashMap<String,String> params = parseStringHashMap(st.nextToken()); @@ -1544,11 +1510,11 @@ public class ProgramConverter //exit instructions ArrayList<Instruction> exit = parseInstructions(st.nextToken(), 0); - + //program blocks //reset id to preinit state, replaced during exec ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); - - ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterPredVars, params); + + ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params); pfpb.disableOptimization(); //already done in top-level parfor pfpb.setResultVariables(resultVars); pfpb.setFromInstructions(from); @@ -1556,22 +1522,18 @@ public class ProgramConverter pfpb.setIncrementInstructions(incr); pfpb.setExitInstructions(exit); pfpb.setChildBlocks(pbs); - //pfpb.setVariables(vars); return pfpb; } - private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) + private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { - String lin = in.substring( PARFOR_PB_IF.length(),in.length()-PARFOR_PB_END.length()); + String lin = in.substring( PARFOR_PB_IF.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - //LocalVariableMap vars = parseVariables(st.nextToken()); - //predicate instructions ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); - String var = st.nextToken(); //exit instructions ArrayList<Instruction> exit = parseInstructions(st.nextToken(),id); @@ -1581,23 +1543,19 @@ public class ProgramConverter ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id); IfProgramBlock ipb = new IfProgramBlock(prog,inst); - ipb.setPredicateResultVar(var); ipb.setExitInstructions2(exit); ipb.setChildBlocksIfBody(pbs1); ipb.setChildBlocksElseBody(pbs2); - //ipb.setVariables(vars); return ipb; } - private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) + private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) throws DMLRuntimeException { - String lin = in.substring( PARFOR_PB_FC.length(),in.length()-PARFOR_PB_END.length()); + String lin = in.substring( PARFOR_PB_FC.length(),in.length()-PARFOR_PB_END.length()); HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); - //LocalVariableMap vars = parseVariables(st.nextToken()); - //inputs and outputs ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken()); ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken()); @@ -1613,7 +1571,6 @@ public class ProgramConverter FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2); fpb.setInstructions(inst); fpb.setChildBlocks(pbs); - //fpb.setVariables(vars); return fpb; } @@ -1645,9 +1602,7 @@ public class ProgramConverter //only CP external functions, because no nested MR jobs for reblocks ExternalFunctionProgramBlockCP efpb = new ExternalFunctionProgramBlockCP(prog, tmp1, tmp2, dat3, basedir); - //efpb.setInstructions(inst); efpb.setChildBlocks(pbs); - //efpb.setVariables(vars); return efpb; } @@ -1713,37 +1668,19 @@ public class ProgramConverter { ArrayList<String> vars = new ArrayList<String>(); StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM); - while( st.hasMoreTokens() ) - { - String tmp = st.nextToken(); + while( st.hasMoreTokens() ) { + String tmp = st.nextToken(); vars.add(tmp); } return vars; } - private static String[] parseStringArray( String in ) - { - StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM); - int len = st.countTokens(); - String[] a = new String[len]; - for( int i=0; i<len; i++ ) - { - String tmp = st.nextToken(); - if( tmp.equals("null") ) - a[i] = null; - else - a[i] = tmp; - } - return a; - } - private static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) { ArrayList<DataIdentifier> vars = new ArrayList<DataIdentifier>(); StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM); - while( st.hasMoreTokens() ) - { + while( st.hasMoreTokens() ) { String tmp = st.nextToken(); DataIdentifier dat = parseDataIdentifier( tmp ); vars.add(dat); @@ -1807,7 +1744,7 @@ public class ProgramConverter dat = new StringObject(name,valString); break; default: - throw new DMLRuntimeException("Unable to parse valuetype "+valuetype); + throw new DMLRuntimeException("Unable to parse valuetype "+valuetype); } break; } @@ -1820,7 +1757,7 @@ public class ProgramConverter int bcols = Integer.parseInt( st.nextToken() ); long nnz = Long.parseLong( st.nextToken() ); InputInfo iin = InputInfo.stringToInputInfo( st.nextToken() ); - OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() ); + OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() ); PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() ); UpdateType inplace = UpdateType.valueOf( st.nextToken() ); MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, brows, bcols, nnz); @@ -1940,7 +1877,7 @@ public class ProgramConverter public String nextToken() { - int nextDelim = determineNextSameLevelIndexOf(_str, _del); + int nextDelim = determineNextSameLevelIndexOf(_str, _del); String token = null; if(nextDelim < 0) { @@ -1965,7 +1902,7 @@ public class ProgramConverter i2 = tmpdata.indexOf(LEVELIN); i3 = tmpdata.indexOf(LEVELOUT); - if( i1 < 0 ) return i1; //no pattern found at all + if( i1 < 0 ) return i1; //no pattern found at all min = i1; //min >= 0 by definition if( i2 >= 0 ) min = Math.min(min, i2); http://git-wip-us.apache.org/repos/asf/systemml/blob/1d83cedb/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java index 1d5a195..d8cad78 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptTreeConverter.java @@ -181,8 +181,9 @@ public class OptTreeConverter _rtMap.putMapping(fpb, node); node.setExecType(ExecType.CP); - //TODO use constant value if known - node.addParam(ParamType.NUM_ITERATIONS, String.valueOf(CostEstimator.FACTOR_NUM_ITERATIONS)); + //determine number of iterations + long N = OptimizerUtils.getNumIterations(fpb, vars, CostEstimator.FACTOR_NUM_ITERATIONS); + node.addParam(ParamType.NUM_ITERATIONS, String.valueOf(N)); node.addChilds( createOptNodes( fpb.getFromInstructions(), vars,storeObjs ) ); node.addChilds( createOptNodes( fpb.getToInstructions(), vars,storeObjs ) ); @@ -375,7 +376,9 @@ public class OptTreeConverter node.setExecType(ExecType.CP); node.setLineNumbers(fsb.getBeginLine(), fsb.getEndLine()); - node.addParam(ParamType.NUM_ITERATIONS, String.valueOf(CostEstimator.FACTOR_NUM_ITERATIONS)); + //determine number of iterations + long N = OptimizerUtils.getNumIterations(fpb, vars, CostEstimator.FACTOR_NUM_ITERATIONS); + node.addParam(ParamType.NUM_ITERATIONS, String.valueOf(N)); //handle predicate fsb.getFromHops().resetVisitStatus(); http://git-wip-us.apache.org/repos/asf/systemml/blob/1d83cedb/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java index fb83fd6..9e78c2a 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java @@ -154,14 +154,11 @@ public class OptimizerConstrained extends OptimizerRuleBased // rewrite 8: rewrite set partition replication factor super.rewriteSetExportReplicationFactor( pn, ec.getVariables() ); - // rewrite 9: nested parallelism (incl exec types) - boolean flagNested = super.rewriteNestedParallelism( pn, M1, flagLIX ); - // rewrite 10: determine parallelism - rewriteSetDegreeOfParallelism( pn, M1, flagNested ); + rewriteSetDegreeOfParallelism( pn, M1, false ); // rewrite 11: task partitioning - rewriteSetTaskPartitioner( pn, flagNested, flagLIX ); + rewriteSetTaskPartitioner( pn, false, flagLIX ); // rewrite 12: fused data partitioning and execution rewriteSetFusedDataPartitioningExecution(pn, M1, flagLIX, partitionedMatrices, ec.getVariables(), tmpmode); @@ -369,11 +366,10 @@ public class OptimizerConstrained extends OptimizerRuleBased if( emode == PExecMode.REMOTE_MR_DP || emode == PExecMode.REMOTE_SPARK_DP ) { ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter - .getAbstractPlanMapping().getMappedProg(pn.getID())[1]; + .getAbstractPlanMapping().getMappedProg(pn.getID())[1]; //partitioned matrix - if( partitionedMatrices.size()<=0 ) - { + if( partitionedMatrices.size()<=0 ) { LOG.debug(getOptMode()+" OPT: unable to force 'set fused data partitioning and execution' - result="+false ); return; } @@ -381,11 +377,8 @@ public class OptimizerConstrained extends OptimizerRuleBased String moVarname = partitionedMatrices.keySet().iterator().next(); PartitionFormat moDpf = partitionedMatrices.get(moVarname); MatrixObject mo = (MatrixObject)vars.get(moVarname); - - //check if access via iteration variable and sizes match - String iterVarname = pfpb.getIterablePredicateVars()[0]; - - if( rIsAccessByIterationVariable(pn, moVarname, iterVarname) && + + if( rIsAccessByIterationVariable(pn, moVarname, pfpb.getIterVar()) && ((moDpf==PartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N) || (moDpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE_N && mo.getNumRows()<=_N*moDpf._N)|| http://git-wip-us.apache.org/repos/asf/systemml/blob/1d83cedb/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 118aede..018364f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -155,7 +155,6 @@ public class OptimizerRuleBased extends Optimizer public static final int MAX_REPLICATION_FACTOR_PARTITIONING = 5; public static final int MAX_REPLICATION_FACTOR_EXPORT = 7; public static final boolean ALLOW_REMOTE_NESTED_PARALLELISM = false; - public static final boolean APPLY_REWRITE_NESTED_PARALLELISM = false; public static final String FUNCTION_UNFOLD_NAMEPREFIX = "__unfold_"; public static final boolean APPLY_REWRITE_UPDATE_INPLACE_INTERMEDIATE = true; @@ -285,14 +284,11 @@ public class OptimizerRuleBased extends Optimizer // rewrite 8: rewrite set partition replication factor rewriteSetExportReplicationFactor( pn, ec.getVariables() ); - // rewrite 9: nested parallelism (incl exec types) - boolean flagNested = rewriteNestedParallelism( pn, M1, flagLIX ); - // rewrite 10: determine parallelism - rewriteSetDegreeOfParallelism( pn, M1, flagNested ); + rewriteSetDegreeOfParallelism( pn, M1, false ); // rewrite 11: task partitioning - rewriteSetTaskPartitioner( pn, flagNested, flagLIX ); + rewriteSetTaskPartitioner( pn, false, flagLIX ); // rewrite 12: fused data partitioning and execution rewriteSetFusedDataPartitioningExecution(pn, M1, flagLIX, partitionedMatrices, ec.getVariables()); @@ -357,7 +353,7 @@ public class OptimizerRuleBased extends Optimizer protected void analyzeProblemAndInfrastructure( OptNode pn ) { - _N = Long.parseLong(pn.getParam(ParamType.NUM_ITERATIONS)); + _N = Long.parseLong(pn.getParam(ParamType.NUM_ITERATIONS)); _Nmax = pn.getMaxProblemSize(); _lk = InfrastructureAnalyzer.getLocalParallelism(); _lkmaxCP = (int) Math.ceil( PAR_K_FACTOR * _lk ); @@ -617,9 +613,9 @@ public class OptimizerRuleBased extends Optimizer //determine if applicable boolean apply = M < _rm //ops fit in remote memory budget && !cand.isEmpty() //at least one MR - && isResultPartitionableAll(cand,pfpb.getResultVariables(), - vars, pfpb.getIterablePredicateVars()[0]); // check candidates - + && isResultPartitionableAll(cand,pfpb.getResultVariables(), + vars, pfpb.getIterVar()); // check candidates + //recompile LIX if( apply ) { @@ -1002,7 +998,7 @@ public class OptimizerRuleBased extends Optimizer { //find all candidates matrices (at least one partitioned access via iterVar) HashSet<String> cand = new HashSet<String>(); - rFindDataColocationCandidates(n, cand, pfpb.getIterablePredicateVars()[0]); + rFindDataColocationCandidates(n, cand, pfpb.getIterVar()); //select largest matrix for colocation (based on nnz to account for sparsity) long nnzMax = Long.MIN_VALUE; @@ -1016,7 +1012,7 @@ public class OptimizerRuleBased extends Optimizer apply = true; } } - } + } } //modify the runtime plan (apply true if at least one candidate) @@ -1093,7 +1089,7 @@ public class OptimizerRuleBased extends Optimizer if(((n.getExecType()==ExecType.MR && n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_MR.name())) || (n.getExecType()==ExecType.SPARK && n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_SPARK.name()))) && n.hasNestedParallelism(false) - && n.hasNestedPartitionReads(false) ) + && n.hasNestedPartitionReads(false) ) { apply = true; @@ -1179,73 +1175,6 @@ public class OptimizerRuleBased extends Optimizer _numEvaluatedPlans++; LOG.debug(getOptMode()+" OPT: rewrite 'set export replication factor' - result="+apply+((apply)?" ("+replication+")":"") ); } - - - /////// - //REWRITE enable nested parallelism - /// - - @SuppressWarnings("all") - protected boolean rewriteNestedParallelism(OptNode n, double M, boolean flagLIX ) - throws DMLRuntimeException - { - boolean nested = false; - - if( APPLY_REWRITE_NESTED_PARALLELISM - && !flagLIX // if not applied left indexing rewrite - && _N >= _rnk // at least exploit all nodes - && !n.hasNestedParallelism(false)// only for 1D problems, otherwise potentially bad load balance - && M * _lkmaxCP <= _rm ) // only if we can exploit full local parallelism in the map task JVM memory - { - //modify tree - ArrayList<OptNode> tmpOld = n.getChilds(); - OptNode nest = new OptNode(NodeType.PARFOR, ExecType.CP); - ArrayList<OptNode> tmpNew = new ArrayList<OptNode>(); - tmpNew.add(nest); - n.setChilds(tmpNew); - nest.setChilds(tmpOld); - - //modify rtprog - long id = n.getID(); - ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter - .getAbstractPlanMapping().getMappedProg(id)[1]; - ArrayList<ProgramBlock> tmpPBOld = pfpb.getChildBlocks(); - - //create new program block structure and modify parameters (from, to, incr, types,) - String[] iterVars = pfpb.getIterablePredicateVars(); //from, to stay original - String[] iterVars2 = iterVars.clone(); //itervar, incr stay original - int outIncr = (int)Math.ceil(((double)_N)/_rnk); - iterVars[ 0 ] = ParForStatementBlock.INTERAL_FN_INDEX_ROW; // already checked for uniqueness in ParForStatementBlock - iterVars[ 3 ] = String.valueOf(outIncr); - iterVars2[ 1 ] = ParForStatementBlock.INTERAL_FN_INDEX_ROW; //sub start - iterVars2[ 2 ] = null; - HashMap<String,String> params = pfpb.getParForParams(); - HashMap<String,String> params2 = (HashMap<String,String>)params.clone(); - ParForProgramBlock pfpb2 = new ParForProgramBlock(pfpb.getProgram(),iterVars2, params2); - OptTreeConverter.getAbstractPlanMapping().putProgMapping(null, pfpb2, nest); - - ArrayList<ProgramBlock> tmpPBNew = new ArrayList<ProgramBlock>(); - tmpPBNew.add(pfpb2); - pfpb.setChildBlocks(tmpPBNew); - pfpb.setIterablePredicateVars(iterVars); - pfpb.setIncrementInstructions(new ArrayList<Instruction>()); - pfpb.setExecMode(PExecMode.REMOTE_MR); - pfpb2.setChildBlocks(tmpPBOld); - pfpb2.setResultVariables(pfpb.getResultVariables()); - pfpb2.setFromInstructions(new ArrayList<Instruction>()); - pfpb2.setToInstructions(ProgramRecompiler.createNestedParallelismToInstructionSet( ParForStatementBlock.INTERAL_FN_INDEX_ROW, String.valueOf(outIncr-1) )); - pfpb2.setIncrementInstructions(new ArrayList<Instruction>()); - pfpb2.setExecMode(PExecMode.LOCAL); - - nested = true; - } - - _numEvaluatedPlans++; - LOG.debug(getOptMode()+" OPT: rewrite 'enable nested parallelism' - result="+nested ); - - return nested; - } - /////// //REWRITE set degree of parallelism @@ -1526,10 +1455,7 @@ public class OptimizerRuleBased extends Optimizer PartitionFormat moDpf = partitionedMatrices.get(moVarname); MatrixObject mo = (MatrixObject)vars.get(moVarname); - //check if access via iteration variable and sizes match - String iterVarname = pfpb.getIterablePredicateVars()[0]; - - if( rIsAccessByIterationVariable(pn, moVarname, iterVarname) && + if( rIsAccessByIterationVariable(pn, moVarname, pfpb.getIterVar()) && ((moDpf==PartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N) || (moDpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE_N && mo.getNumRows()<=_N*moDpf._N)|| @@ -1540,7 +1466,7 @@ public class OptimizerRuleBased extends Optimizer pn.addParam(ParamType.DATA_PARTITIONER, REMOTE_DPE.toString()+"(fused)"); pn.setK( k ); - pfpb.setExecMode(REMOTE_DPE); //set fused exec type + pfpb.setExecMode(REMOTE_DPE); //set fused exec type pfpb.setDataPartitioner(PDataPartitioner.NONE); pfpb.enableColocatedPartitionedMatrix( moVarname ); pfpb.setDegreeOfParallelism(k); @@ -2815,7 +2741,7 @@ public class OptimizerRuleBased extends Optimizer ArrayList<String> cleanedVars = new ArrayList<String>(); ArrayList<String> resultVars = pfpb.getResultVariables(); - String itervar = pfpb.getIterablePredicateVars()[0]; + String itervar = pfpb.getIterVar(); for( String rvar : resultVars ) { Data dat = ec.getVariable(rvar); http://git-wip-us.apache.org/repos/asf/systemml/blob/1d83cedb/src/test/java/org/apache/sysml/test/gpu/RightIndexingTests.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/gpu/RightIndexingTests.java b/src/test/java/org/apache/sysml/test/gpu/RightIndexingTests.java index e891b7f..90b08ae 100644 --- a/src/test/java/org/apache/sysml/test/gpu/RightIndexingTests.java +++ b/src/test/java/org/apache/sysml/test/gpu/RightIndexingTests.java @@ -57,7 +57,7 @@ public class RightIndexingTests extends GPUTests { for (int k = 0; k < sparsities.length; k++) { double sparsity = sparsities[k]; Matrix X = generateInputMatrix(spark, dim1, dim2, sparsity, seed); - Matrix Y = generateInputMatrix(spark, dim1, dim2, sparsity, seed); + //FIXME Matrix Y = generateInputMatrix(spark, dim1, dim2, sparsity, seed); HashMap<String, Object> inputs = new HashMap<>(); inputs.put("X", X); String scriptStr = "O = X[" + rl + ":" + ru + "," + cl + ":" + cu + "];"; http://git-wip-us.apache.org/repos/asf/systemml/blob/1d83cedb/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java b/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java index 923dc47..c8a883a 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java @@ -47,8 +47,8 @@ public class SparsityRecompileTest extends AutomatedTestBase private final static String TEST_CLASS_DIR = TEST_DIR + SparsityRecompileTest.class.getSimpleName() + "/"; private final static long rows = 1000; - private final static long cols = 500000; - private final static double sparsity = 0.00001d; + private final static long cols = 500000; + private final static double sparsity = 0.00001d; private final static double val = 7.0; @Override http://git-wip-us.apache.org/repos/asf/systemml/blob/1d83cedb/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/SVDFactorizeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/SVDFactorizeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/SVDFactorizeTest.java index 4f09b74..8623d32 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/SVDFactorizeTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/SVDFactorizeTest.java @@ -29,7 +29,6 @@ import org.apache.sysml.test.integration.TestConfiguration; public class SVDFactorizeTest extends AutomatedTestBase { - private final static String TEST_NAME1 = "svd"; private final static String TEST_DIR = "functions/unary/matrix/"; private static final String TEST_CLASS_DIR = TEST_DIR + SVDFactorizeTest.class.getSimpleName() + "/"; @@ -41,59 +40,48 @@ public class SVDFactorizeTest extends AutomatedTestBase private final static double sparsity = 0.9; @Override - public void setUp() - { - addTestConfiguration( - TEST_NAME1, - new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, - new String[] { "D" }) ); + public void setUp() { + addTestConfiguration(TEST_NAME1, + new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "D" }) ); } @Test - public void testSVDFactorizeDenseCP() - { + public void testSVDFactorizeDenseCP() { runTestSVDFactorize( rows1, cols1, RUNTIME_PLATFORM.SINGLE_NODE ); } @Test - public void testSVDFactorizeDenseSP() - { + public void testSVDFactorizeDenseSP() { runTestSVDFactorize( rows1, cols1, RUNTIME_PLATFORM.SPARK ); } @Test - public void testSVDFactorizeDenseMR() - { + public void testSVDFactorizeDenseMR() { runTestSVDFactorize( rows1, cols1, RUNTIME_PLATFORM.HADOOP ); } @Test - public void testSVDFactorizeDenseHybrid() - { + public void testSVDFactorizeDenseHybrid() { runTestSVDFactorize( rows1, cols1, RUNTIME_PLATFORM.HYBRID ); } @Test - public void testLargeSVDFactorizeDenseCP() - { + public void testLargeSVDFactorizeDenseCP() { runTestSVDFactorize( rows2, cols2, RUNTIME_PLATFORM.SINGLE_NODE ); } @Test - public void testLargeSVDFactorizeDenseSP() - { + public void testLargeSVDFactorizeDenseSP() { runTestSVDFactorize( rows2, cols2, RUNTIME_PLATFORM.SPARK ); } @Test - public void testLargeSVDFactorizeDenseMR() - { + public void testLargeSVDFactorizeDenseMR() { runTestSVDFactorize( rows2, cols2, RUNTIME_PLATFORM.HADOOP ); } @Test - public void testLargeSVDFactorizeDenseHybrid() - { + public void testLargeSVDFactorizeDenseHybrid() { runTestSVDFactorize( rows2, cols2, RUNTIME_PLATFORM.HYBRID ); } @@ -103,8 +91,8 @@ public class SVDFactorizeTest extends AutomatedTestBase rtplatform = rt; boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; - if( rtplatform == RUNTIME_PLATFORM.SPARK ) - DMLScript.USE_LOCAL_SPARK_CONFIG = true; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; try { @@ -127,9 +115,10 @@ public class SVDFactorizeTest extends AutomatedTestBase runTest(true, exceptionExpected, null, -1); compareResults(1e-8); } - finally - { + finally { + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; rtplatform = rtold; + } }
