[SYSTEMML-1310] New parfor block partitioning (compiler/runtime/tests) This patch extends the parfor optimizer and runtime with the support for block partitioning, i.e., the partitioning and access of batches of rows or columns. Batches are required to be aligned and smaller or equal than the blocksize in the partitioning dimension, but can be arbitrarily large in the other dimension. So far, only the remote spark data partitioner supports these new partitioning formats but the resulting partitions can be used either in local or remote parfor operators.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/39f75ca0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/39f75ca0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/39f75ca0 Branch: refs/heads/master Commit: 39f75ca06012f192d6a8bdea33f1be0bd89fbd8f Parents: f693bca Author: Matthias Boehm <[email protected]> Authored: Tue Mar 21 00:13:27 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Mar 21 12:38:24 2017 -0700 ---------------------------------------------------------------------- .../sysml/parser/ParForStatementBlock.java | 245 ++++++++++--------- .../controlprogram/ParForProgramBlock.java | 82 +++++-- .../controlprogram/caching/MatrixObject.java | 16 ++ .../controlprogram/parfor/DataPartitioner.java | 3 +- .../parfor/DataPartitionerLocal.java | 9 +- .../parfor/DataPartitionerRemoteMR.java | 8 +- .../parfor/DataPartitionerRemoteSpark.java | 8 +- .../DataPartitionerRemoteSparkMapper.java | 58 +++-- .../controlprogram/parfor/ProgramConverter.java | 10 +- .../controlprogram/parfor/RemoteDPParForMR.java | 12 +- .../parfor/RemoteDPParForSpark.java | 21 +- .../parfor/RemoteDPParForSparkWorker.java | 9 +- .../controlprogram/parfor/opt/OptNode.java | 3 +- .../parfor/opt/OptimizerConstrained.java | 14 +- .../parfor/opt/OptimizerRuleBased.java | 57 +++-- .../cpfile/MatrixIndexingCPFileInstruction.java | 8 +- .../sysml/runtime/matrix/DataPartitionMR.java | 5 +- .../ParForBlockwiseDataPartitioningTest.java | 215 ++++++++++++++++ .../parfor/parfor_bcdatapartitioning_neg.R | 37 +++ .../parfor/parfor_bcdatapartitioning_neg.dml | 37 +++ .../parfor/parfor_bcdatapartitioning_pos.R | 37 +++ .../parfor/parfor_bcdatapartitioning_pos.dml | 37 +++ .../parfor/parfor_brdatapartitioning_neg.R | 37 +++ .../parfor/parfor_brdatapartitioning_neg.dml | 37 +++ .../parfor/parfor_brdatapartitioning_pos.R | 37 +++ .../parfor/parfor_brdatapartitioning_pos.dml | 37 +++ 26 files changed, 851 insertions(+), 228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java index 5ca0cbc..784df63 100644 --- a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java +++ b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java @@ -30,7 +30,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.Level; import org.apache.log4j.Logger; - +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.parser.Expression.BinaryOp; import org.apache.sysml.parser.Expression.BuiltinFunctionOp; import org.apache.sysml.parser.Expression.DataType; @@ -41,6 +42,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.POptMode; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PResultMerge; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; @@ -403,43 +405,27 @@ public class ParForStatementBlock extends ForStatementBlock * @param var variables * @return partition format */ - public PDataPartitionFormat determineDataPartitionFormat(String var) + public PartitionFormat determineDataPartitionFormat(String var) { - PDataPartitionFormat dpf = null; - List<PDataPartitionFormat> dpfc = new LinkedList<PDataPartitionFormat>(); + PartitionFormat dpf = null; + List<PartitionFormat> dpfc = new LinkedList<PartitionFormat>(); try { //determine partitioning candidates - ParForStatement pfs = (ParForStatement) _statements.get(0); - rDeterminePartitioningCandidates(var, pfs.getBody(), dpfc); + ParForStatement dpfs = (ParForStatement) _statements.get(0); + rDeterminePartitioningCandidates(var, dpfs.getBody(), dpfc); //determine final solution - for( PDataPartitionFormat tmp : dpfc ) - { - //System.out.println(var+": "+tmp); - if( dpf != null && dpf!=tmp ) //if no consensus - dpf = PDataPartitionFormat.NONE; - else - dpf = tmp; - - /* TODO block partitioning - if( dpf == null || dpf==tmp ) //consensus - dpf = tmp; - else if( dpf==PDataPartitionFormat.BLOCK_WISE_M_N //subsumption - || tmp==PDataPartitionFormat.BLOCK_WISE_M_N ) - dpf = PDataPartitionFormat.BLOCK_WISE_M_N; - else //no consensus - dpf = PDataPartitionFormat.NONE; - */ - } + for( PartitionFormat tmp : dpfc ) + dpf = ( dpf!=null && !dpf.equals(tmp) ) ? //if no consensus + PartitionFormat.NONE : tmp; if( dpf == null ) - dpf = PDataPartitionFormat.NONE; + dpf = PartitionFormat.NONE; } - catch (LanguageException e) - { + catch (LanguageException e) { LOG.trace( "Unable to determine partitioning candidates.", e ); - dpf = PDataPartitionFormat.NONE; + dpf = PartitionFormat.NONE; } return dpf; @@ -520,7 +506,7 @@ public class ParForStatementBlock extends ForStatementBlock * @param C list of partition formats * @throws LanguageException if LanguageException occurs */ - private void rDeterminePartitioningCandidates(String var, ArrayList<StatementBlock> asb, List<PDataPartitionFormat> C) + private void rDeterminePartitioningCandidates(String var, ArrayList<StatementBlock> asb, List<PartitionFormat> C) throws LanguageException { for(StatementBlock sb : asb ) // foreach statementblock in parforbody @@ -570,61 +556,77 @@ public class ParForStatementBlock extends ForStatementBlock } } - private void rDeterminePartitioningCandidates(String var, List<DataIdentifier> datsRead, List<PDataPartitionFormat> C) + private void rDeterminePartitioningCandidates(String var, List<DataIdentifier> datsRead, List<PartitionFormat> C) { - if( datsRead != null ) - for(DataIdentifier read : datsRead) - { - String readStr = read.getName(); - if( var.equals( readStr ) ) - { - if( read instanceof IndexedIdentifier ) - { - IndexedIdentifier idat = (IndexedIdentifier) read; - C.add( determineAccessPattern(idat) ); - } - else if( read instanceof DataIdentifier ) - { - C.add( PDataPartitionFormat.NONE ); - } - } + if( datsRead == null ) + return; + + for(DataIdentifier read : datsRead) + if( var.equals( read.getName() ) ) { + if( read instanceof IndexedIdentifier ) + C.add( determineAccessPattern((IndexedIdentifier) read) ); + else if( read instanceof DataIdentifier ) + C.add( PartitionFormat.NONE ); } } - private PDataPartitionFormat determineAccessPattern( IndexedIdentifier dat ) + private PartitionFormat determineAccessPattern( IndexedIdentifier dat ) { - PDataPartitionFormat dpf = null; + boolean isSpark = OptimizerUtils.isSparkExecutionMode(); + int blksz = ConfigurationManager.getBlocksize(); + PartitionFormat dpf = null; //1) get all bounds expressions for index access Expression rowL = dat.getRowLowerBound(); Expression rowU = dat.getRowUpperBound(); Expression colL = dat.getColLowerBound(); Expression colU = dat.getColUpperBound(); - - //2) decided on access pattern - //COLUMN_WISE iff row expr is colon (all rows) - // and access to single column - if( rowL == null && rowU == null && - colL!=null && colU != null && colL.equals(colU) ) - { - dpf = PDataPartitionFormat.COLUMN_WISE; - } - //ROW_WISE iff col expr is colon (all columns) - // and access to single row - else if( colL == null && colU == null && - rowL!=null && rowU != null && rowL.equals(rowU) ) - { - dpf = PDataPartitionFormat.ROW_WISE; + boolean allRows = (rowL == null && rowU == null); + boolean allCols = (colL == null && colU == null); + + try { + //2) decided on access pattern + //COLUMN_WISE if all rows and access to single column + if( allRows && colL!=null && colL.equals(colU) ) { + dpf = PartitionFormat.COLUMN_WISE; + } + //ROW_WISE if all cols and access to single row + else if( allCols && rowL!=null && rowL.equals(rowU) ) { + dpf = PartitionFormat.ROW_WISE; + } + //COLUMN_BLOCK_WISE + else if( isSpark && allRows && colL != colU ) { + LinearFunction l1 = getLinearFunction(colL, true); + LinearFunction l2 = getLinearFunction(colU, true); + dpf = !isAlignedBlocking(l1, l2, blksz) ? PartitionFormat.NONE : + new PartitionFormat(PDataPartitionFormat.COLUMN_BLOCK_WISE_N, (int)l1._b[0]); + } + //ROW_BLOCK_WISE + else if( isSpark && allCols && rowL != rowU ) { + LinearFunction l1 = getLinearFunction(rowL, true); + LinearFunction l2 = getLinearFunction(rowU, true); + dpf = !isAlignedBlocking(l1, l2, blksz) ? PartitionFormat.NONE : + new PartitionFormat(PDataPartitionFormat.ROW_BLOCK_WISE_N, (int)l1._b[0]); + } + //NONE otherwise (conservative) + else + dpf = PartitionFormat.NONE; + } + catch(Exception ex) { + throw new RuntimeException(ex); } - //NONE otherwise (conservative) - else - dpf = PDataPartitionFormat.NONE; - - //TODO block partitioning return dpf; } + private static boolean isAlignedBlocking(LinearFunction l1, LinearFunction l2, int blksz) { + return (l1!=null && l2!=null && l1.equalSlope(l2) //same slope + && l1._b.length==1 && l1._b[0]<=blksz //single index and block + && (l2._a - l1._a + 1 == l1._b[0]) //intercept difference is slope + && (blksz/l1._b[0])*l1._b[0] == blksz //aligned slope + && l2.eval(1L) == l1._b[0] ); //aligned intercept + } + private void rConsolidateResultVars(ArrayList<StatementBlock> asb, ArrayList<String> vars) throws LanguageException { @@ -1718,6 +1720,28 @@ public class ParForStatementBlock extends ForStatementBlock return out; } + private LinearFunction getLinearFunction(Expression expr, boolean ignoreMinWithConstant) + throws LanguageException { + if( expr instanceof IntIdentifier ) + return new LinearFunction(((IntIdentifier)expr).getValue(), 0, null); + else if( expr instanceof BinaryExpression ) + return rParseBinaryExpression((BinaryExpression)expr); + else if( expr instanceof BuiltinFunctionExpression && ignoreMinWithConstant ) { + //note: builtin function expression is also a data identifier and hence order before + BuiltinFunctionExpression bexpr = (BuiltinFunctionExpression) expr; + if( bexpr.getOpCode()==BuiltinFunctionOp.MIN ) { + if( bexpr.getFirstExpr() instanceof BinaryExpression ) + return rParseBinaryExpression((BinaryExpression)bexpr.getFirstExpr()); + else if( bexpr.getSecondExpr() instanceof BinaryExpression ) + return rParseBinaryExpression((BinaryExpression)bexpr.getSecondExpr()); + } + } + else if( expr instanceof DataIdentifier ) + return new LinearFunction(0, 1, ((DataIdentifier)expr)._name); //never use public members + + return null; + } + /** * Creates a functionID for a given data identifier (mainly used for caching purposes), * where data identifiers with equal name and matrix subscripts results in equal @@ -1931,16 +1955,25 @@ public class ParForStatementBlock extends ForStatementBlock } else if( be.getOpCode() == BinaryOp.MULT ) { - //NOTE: no recursion for MULT expressions + //NOTE: only recursion for MULT expressions, where + //one side is a constant //atomic case Long cvalL = parseLongConstant(l); Long cvalR = parseLongConstant(r); - if( cvalL != null ) + if( cvalL != null && r instanceof DataIdentifier ) ret = new LinearFunction(0, cvalL,((DataIdentifier)r)._name); - else if( cvalR != null ) + else if( cvalR != null && l instanceof DataIdentifier ) ret = new LinearFunction(0, cvalR,((DataIdentifier)l)._name); + else if( cvalL != null && r instanceof BinaryExpression ) { + LinearFunction ltmp = rParseBinaryExpression((BinaryExpression)r); + return ltmp.scale(cvalL); + } + else if( cvalR != null && l instanceof BinaryExpression ) { + LinearFunction ltmp = rParseBinaryExpression((BinaryExpression)l); + return ltmp.scale(cvalR); + } else return null; //let dependency analysis fail } @@ -2014,8 +2047,7 @@ public class ParForStatementBlock extends ForStatementBlock _vars[0] = name; } - public void addConstant(long value) - { + public void addConstant(long value) { _a += value; } @@ -2048,19 +2080,26 @@ public class ParForStatementBlock extends ForStatementBlock _vars = tmpvars; } - public void scale( long scale ) - { - _a *= scale; //-1 because indexing starts at 1 - + public LinearFunction scale( long scale ) { + _a *= scale; for( int i=0; i<_b.length; i++ ) if( _b[i] != 0 ) _b[i] *= scale; + return this; } - public void normalize(int index, long lower, long increment) - { + public LinearFunction normalize(int index, long lower, long increment) { _a -= (_b[index] * lower); _b[index] *= increment; + return this; + } + + public long eval(Long... x) { + long ret = _a; + for( int i=0; i<_b.length; i++ ) + if( _b[i] != 0 ) + ret += _b[i] *= x[i]; + return ret; } @Override @@ -2087,48 +2126,36 @@ public class ParForStatementBlock extends ForStatementBlock } @Override - public boolean equals( Object o2 ) - { + public boolean equals( Object o2 ) { if( o2 == null || !(o2 instanceof LinearFunction) ) return false; LinearFunction f2 = (LinearFunction)o2; - boolean ret = true; - ret &= ( _a == f2._a ); - ret &= ( _b.length == f2._b.length ); - - if( ret ) - { - for( int i=0; i<_b.length; i++ ) - { - ret &= (_b[i] == f2._b[i] ); - ret &= (_vars[i].equals(f2._vars[i]) - ||(_vars[i].startsWith(INTERAL_FN_INDEX_ROW) && f2._vars[i].startsWith(INTERAL_FN_INDEX_ROW)) - ||(_vars[i].startsWith(INTERAL_FN_INDEX_COL) && f2._vars[i].startsWith(INTERAL_FN_INDEX_COL)) ) ; - } + return ( _a == f2._a ) + && equalSlope(f2); + } + + public boolean equalSlope(LinearFunction f2) { + boolean ret = ( _b.length == f2._b.length ); + for( int i=0; i<_b.length && ret; i++ ) { + ret &= (_b[i] == f2._b[i] ); + ret &= (_vars[i].equals(f2._vars[i]) + ||(_vars[i].startsWith(INTERAL_FN_INDEX_ROW) && f2._vars[i].startsWith(INTERAL_FN_INDEX_ROW)) + ||(_vars[i].startsWith(INTERAL_FN_INDEX_COL) && f2._vars[i].startsWith(INTERAL_FN_INDEX_COL)) ) ; } - return ret; } - + @Override - public int hashCode() - { - //use identity hash code - return super.hashCode(); + public int hashCode() { + return super.hashCode(); //identity } - public boolean hasNonIndexVariables() - { - boolean ret = false; + public boolean hasNonIndexVariables() { for( String var : _vars ) if( var!=null && !_bounds._lower.containsKey(var) ) - { - ret = true; - break; - } - - return ret; + return true; + return false; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index 0e3574f..a6b8e0e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -22,6 +22,7 @@ package org.apache.sysml.runtime.controlprogram; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -179,6 +180,42 @@ public class ParForProgramBlock extends ForProgramBlock } } + /** + * Convenience class to package PDataPartitionFormat and its parameters. + */ + public static class PartitionFormat implements Serializable { + private static final long serialVersionUID = 4729309847778707801L; + public static final PartitionFormat NONE = new PartitionFormat(PDataPartitionFormat.NONE, -1); + public static final PartitionFormat ROW_WISE = new PartitionFormat(PDataPartitionFormat.ROW_WISE, -1); + public static final PartitionFormat COLUMN_WISE = new PartitionFormat(PDataPartitionFormat.COLUMN_WISE, -1); + + public final PDataPartitionFormat _dpf; + public final int _N; + public PartitionFormat(PDataPartitionFormat dpf, int N) { + _dpf = dpf; + _N = N; + } + @Override + public boolean equals(Object o) { + return (o instanceof PartitionFormat) + && _dpf == ((PartitionFormat)o)._dpf + && _N == ((PartitionFormat)o)._N; + } + @Override + public String toString() { + return _dpf.name()+","+_N; + } + public static PartitionFormat valueOf(String value) { + String[] parts = value.split(","); + return new PartitionFormat(PDataPartitionFormat + .parsePDataPartitionFormat(parts[0]), Integer.parseInt(parts[1])); + } + public boolean isBlockwise() { + return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N + || _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N; + } + } + public enum PDataPartitioner { NONE, // no data partitioning LOCAL, // local file based partition split on master node @@ -893,11 +930,11 @@ public class ParForProgramBlock extends ForProgramBlock // Step 0) check and compile to CP (if forced remote parfor) boolean flagForced = checkMRAndRecompileToCP(0); - // Step 1) prepare partitioned input matrix (needs to happen before serializing the progam) + // Step 1) prepare partitioned input matrix (needs to happen before serializing the program) ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock(); MatrixObject inputMatrix = ec.getMatrixObject(_colocatedDPMatrix ); - PDataPartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix ); - inputMatrix.setPartitioned(inputDPF, 1); //mark matrix var as partitioned (for reducers) + PartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix ); + inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned // Step 2) init parallel workers (serialize PBs) // NOTES: each mapper changes filenames with regard to his ID as we submit a single job, @@ -921,8 +958,8 @@ public class ParForProgramBlock extends ForProgramBlock exportMatricesToHDFS(ec); // Step 4) submit MR job (wait for finished work) - OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)|| - (inputMatrix.getSparsity()<0.001 && inputDPF==PDataPartitionFormat.ROW_WISE))? + OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PartitionFormat.COLUMN_WISE)|| + (inputMatrix.getSparsity()<0.001 && inputDPF==PartitionFormat.ROW_WISE))? OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo; RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, resultFile, inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP, MAX_RETRYS_ON_ERROR ); @@ -1024,8 +1061,8 @@ public class ParForProgramBlock extends ForProgramBlock // Step 1) prepare partitioned input matrix (needs to happen before serializing the progam) ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock(); MatrixObject inputMatrix = ec.getMatrixObject(_colocatedDPMatrix ); - PDataPartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix ); - inputMatrix.setPartitioned(inputDPF, 1); //mark matrix var as partitioned (for reducers) + PartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix ); + inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned // Step 2) init parallel workers (serialize PBs) // NOTES: each mapper changes filenames with regard to his ID as we submit a single job, @@ -1084,7 +1121,8 @@ public class ParForProgramBlock extends ForProgramBlock private void handleDataPartitioning( ExecutionContext ec ) throws DMLRuntimeException { - if( _dataPartitioner != PDataPartitioner.NONE ) + PDataPartitioner dataPartitioner = _dataPartitioner; + if( dataPartitioner != PDataPartitioner.NONE ) { ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock(); if( sb == null ) @@ -1100,19 +1138,24 @@ public class ParForProgramBlock extends ForProgramBlock { MatrixObject moVar = (MatrixObject) dat; //unpartitioned input - PDataPartitionFormat dpf = sb.determineDataPartitionFormat( var ); - //dpf = (_optMode != POptMode.NONE) ? OptimizerRuleBased.decideBlockWisePartitioning(moVar, dpf) : dpf; + PartitionFormat dpf = sb.determineDataPartitionFormat( var ); LOG.trace("PARFOR ID = "+_ID+", Partitioning read-only input variable "+var+" (format="+dpf+", mode="+_dataPartitioner+")"); - if( dpf != PDataPartitionFormat.NONE ) + if( dpf != PartitionFormat.NONE ) { + if( dataPartitioner != PDataPartitioner.REMOTE_SPARK && dpf.isBlockwise() ) { + LOG.warn("PARFOR ID = "+_ID+", Switching data partitioner from " + dataPartitioner + + " to " + PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning."); + dataPartitioner = PDataPartitioner.REMOTE_SPARK; + } + Timing ltime = new Timing(true); //input data partitioning (reuse if possible) Data dpdatNew = _variablesDPReuse.get(var); if( dpdatNew == null ) //no reuse opportunity { - DataPartitioner dp = createDataPartitioner( dpf, _dataPartitioner, ec ); + DataPartitioner dp = createDataPartitioner( dpf, dataPartitioner, ec ); //disable binary cell for sparse if consumed by MR jobs if( !OptimizerRuleBased.allowsBinaryCellPartitions(moVar, dpf ) || OptimizerUtils.isSparkExecutionMode() ) //TODO support for binarycell @@ -1422,7 +1465,7 @@ public class ParForProgramBlock extends ForProgramBlock * @return data partitioner * @throws DMLRuntimeException if DMLRuntimeException occurs */ - private DataPartitioner createDataPartitioner(PDataPartitionFormat dpf, PDataPartitioner dataPartitioner, ExecutionContext ec) + private DataPartitioner createDataPartitioner(PartitionFormat dpf, PDataPartitioner dataPartitioner, ExecutionContext ec) throws DMLRuntimeException { DataPartitioner dp = null; @@ -1439,19 +1482,17 @@ public class ParForProgramBlock extends ForProgramBlock switch( dataPartitioner ) { case LOCAL: - dp = new DataPartitionerLocal(dpf, -1, _numThreads); + dp = new DataPartitionerLocal(dpf, _numThreads); break; case REMOTE_MR: - dp = new DataPartitionerRemoteMR( dpf, -1, _ID, numRed, - _replicationDP, - MAX_RETRYS_ON_ERROR, - ALLOW_REUSE_MR_JVMS, false ); + dp = new DataPartitionerRemoteMR( dpf, _ID, numRed, + _replicationDP, MAX_RETRYS_ON_ERROR, ALLOW_REUSE_MR_JVMS, false ); break; case REMOTE_SPARK: - dp = new DataPartitionerRemoteSpark( dpf, -1, ec, numRed, false ); + dp = new DataPartitionerRemoteSpark( dpf, ec, numRed, false ); break; default: - throw new DMLRuntimeException("Undefined data partitioner: '" +dataPartitioner.toString()+"'."); + throw new DMLRuntimeException("Unknown data partitioner: '" +dataPartitioner.name()+"'."); } return dp; @@ -1983,7 +2024,6 @@ public class ParForProgramBlock extends ForProgramBlock } } } - public String printBlockErrorLocation(){ return "ERROR: Runtime error in parfor program block generated from parfor statement block between lines " + _beginLine + " and " + _endLine + " -- "; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index 48855ee..dc3cfd1 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -293,6 +293,10 @@ public class MatrixObject extends CacheableData<MatrixBlock> rows = brlen; cols = mc.getCols(); break; + case ROW_BLOCK_WISE_N: + rows = _partitionSize; + cols = mc.getCols(); + break; case COLUMN_WISE: rows = mc.getRows(); cols = 1; @@ -301,6 +305,10 @@ public class MatrixObject extends CacheableData<MatrixBlock> rows = mc.getRows(); cols = bclen; break; + case COLUMN_BLOCK_WISE_N: + rows = mc.getRows(); + cols = _partitionSize; + break; default: throw new CacheException("Unsupported partition format: "+_partitionFormat); } @@ -370,6 +378,10 @@ public class MatrixObject extends CacheableData<MatrixBlock> sb.append(Lop.FILE_SEPARATOR); sb.append((pred.rowStart-1)/brlen+1); break; + case ROW_BLOCK_WISE_N: + sb.append(Lop.FILE_SEPARATOR); + sb.append((pred.rowStart-1)/_partitionSize+1); + break; case COLUMN_WISE: sb.append(Lop.FILE_SEPARATOR); sb.append(pred.colStart); @@ -378,6 +390,10 @@ public class MatrixObject extends CacheableData<MatrixBlock> sb.append(Lop.FILE_SEPARATOR); sb.append((pred.colStart-1)/bclen+1); break; + case COLUMN_BLOCK_WISE_N: + sb.append(Lop.FILE_SEPARATOR); + sb.append((pred.colStart-1)/_partitionSize+1); + break; default: throw new CacheException ("MatrixObject not available to indexed read."); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java index ab76b55..d836e18 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java @@ -57,8 +57,7 @@ public abstract class DataPartitioner protected int _n = -1; //blocksize if applicable protected boolean _allowBinarycell = true; - protected DataPartitioner( PDataPartitionFormat dpf, int n ) - { + protected DataPartitioner( PDataPartitionFormat dpf, int n ) { _format = dpf; _n = n; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java index d16d8a5..eb273b6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.TextInputFormat; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.util.Cell; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; @@ -78,7 +79,6 @@ import org.apache.sysml.runtime.util.LocalFileUtils; */ public class DataPartitionerLocal extends DataPartitioner { - private static final boolean PARALLEL = true; private IDSequence _seq = null; @@ -94,13 +94,12 @@ public class DataPartitionerLocal extends DataPartitioner * @param par -1 for serial otherwise number of threads, can be ignored by implementation * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public DataPartitionerLocal(PDataPartitionFormat dpf, int n, int par) + public DataPartitionerLocal(PartitionFormat dpf, int par) throws DMLRuntimeException { - super(dpf, n); + super(dpf._dpf, dpf._N); - //TODO - if( dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N || dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N ) + if( dpf.isBlockwise() ) throw new DMLRuntimeException("Data partitioning formt '"+dpf+"' not supported by DataPartitionerLocal" ); _seq = new IDSequence(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java index f666feb..5514007 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java @@ -30,6 +30,7 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; @@ -47,23 +48,20 @@ import org.apache.sysml.yarn.DMLAppMasterUtils; */ public class DataPartitionerRemoteMR extends DataPartitioner { - private long _pfid = -1; private int _numReducers = -1; private int _replication = -1; - //private int _max_retry = -1; private boolean _jvmReuse = false; private boolean _keepIndexes = false; - public DataPartitionerRemoteMR(PDataPartitionFormat dpf, int n, long pfid, int numReducers, int replication, int max_retry, boolean jvmReuse, boolean keepIndexes) + public DataPartitionerRemoteMR(PartitionFormat dpf, long pfid, int numReducers, int replication, int max_retry, boolean jvmReuse, boolean keepIndexes) { - super(dpf, n); + super(dpf._dpf, dpf._N); _pfid = pfid; _numReducers = numReducers; _replication = replication; - //_max_retry = max_retry; _jvmReuse = jvmReuse; _keepIndexes = keepIndexes; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java index be758d2..1569709 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java @@ -23,7 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.sysml.api.DMLScript; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; @@ -47,9 +47,9 @@ public class DataPartitionerRemoteSpark extends DataPartitioner private ExecutionContext _ec = null; private long _numRed = -1; - public DataPartitionerRemoteSpark(PDataPartitionFormat dpf, int n, ExecutionContext ec, long numRed, boolean keepIndexes) + public DataPartitionerRemoteSpark(PartitionFormat dpf, ExecutionContext ec, long numRed, boolean keepIndexes) { - super(dpf, n); + super(dpf._dpf, dpf._N); _ec = ec; _numRed = numRed; @@ -78,7 +78,7 @@ public class DataPartitionerRemoteSpark extends DataPartitioner int numRed = (int)determineNumReducers(inRdd, mc, _numRed); //run spark remote data partition job - DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, _format); + DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, _format, _n); DataPartitionerRemoteSparkReducer wfun = new DataPartitionerRemoteSparkReducer(fnameNew, oi); inRdd.flatMapToPair(dpfun) //partition the input blocks .groupByKey(numRed) //group partition blocks http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java index 9e690d5..0f76156 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java @@ -50,28 +50,24 @@ public class DataPartitionerRemoteSparkMapper extends ParWorker implements PairF private static final long serialVersionUID = 332887624852010957L; - private long _rlen = -1; - private long _clen = -1; - private long _brlen = -1; - private long _bclen = -1; - private long _n = -1; - //private InputInfo _ii = null; - //private OutputInfo _oi = null; - private PDataPartitionFormat _dpf = null; + private final long _rlen; + private final long _clen; + private final long _brlen; + private final long _bclen; + private PDataPartitionFormat _dpf; + private final long _n; - public DataPartitionerRemoteSparkMapper(MatrixCharacteristics mc, InputInfo ii, OutputInfo oi, PDataPartitionFormat dpf) + public DataPartitionerRemoteSparkMapper(MatrixCharacteristics mc, InputInfo ii, OutputInfo oi, PDataPartitionFormat dpf, int n) throws DMLRuntimeException { _rlen = mc.getRows(); _clen = mc.getCols(); _brlen = mc.getRowsPerBlock(); _bclen = mc.getColsPerBlock(); - //_ii = ii; - //_oi = oi; _dpf = dpf; + _n = n; } - @Override public Iterator<Tuple2<Long, Writable>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception @@ -113,10 +109,21 @@ public class DataPartitionerRemoteSparkMapper extends ParWorker implements PairF break; } case ROW_BLOCK_WISE_N:{ - PairWritableBlock tmp = new PairWritableBlock(); - tmp.indexes = new MatrixIndexes(((row_offset%_n)/_brlen)+1, col_offset/_bclen+1); - tmp.block = new MatrixBlock(value2); - ret.add(new Tuple2<Long,Writable>(new Long(row_offset/_n+1),tmp)); + if( _n >= _brlen ) { + PairWritableBlock tmp = new PairWritableBlock(); + tmp.indexes = new MatrixIndexes(((row_offset%_n)/_brlen)+1, col_offset/_bclen+1); + tmp.block = new MatrixBlock(value2); + ret.add(new Tuple2<Long,Writable>(new Long(row_offset/_n+1),tmp)); + } + else { + for( int i=0; i<rows; i+=_n ) { + PairWritableBlock tmp = new PairWritableBlock(); + tmp.indexes = new MatrixIndexes(1, col_offset/_bclen+1); + tmp.block = value2.sliceOperations(i, Math.min(i+(int)_n-1, value2.getNumRows()-1), + 0, value2.getNumColumns()-1, new MatrixBlock()); + ret.add(new Tuple2<Long,Writable>(new Long((row_offset+i)/_n+1),tmp)); + } + } break; } case COLUMN_WISE:{ @@ -137,10 +144,21 @@ public class DataPartitionerRemoteSparkMapper extends ParWorker implements PairF break; } case COLUMN_BLOCK_WISE_N: { - PairWritableBlock tmp = new PairWritableBlock(); - tmp.indexes = new MatrixIndexes(row_offset/_brlen+1, ((col_offset%_n)/_bclen)+1); - tmp.block = new MatrixBlock(value2); - ret.add(new Tuple2<Long,Writable>(new Long(col_offset/_n+1),tmp)); + if( _n >= _bclen ) { + PairWritableBlock tmp = new PairWritableBlock(); + tmp.indexes = new MatrixIndexes(row_offset/_brlen+1, ((col_offset%_n)/_bclen)+1); + tmp.block = new MatrixBlock(value2); + ret.add(new Tuple2<Long,Writable>(new Long(col_offset/_n+1),tmp)); + } + else { + for( int i=0; i<cols; i+=_n ) { + PairWritableBlock tmp = new PairWritableBlock(); + tmp.indexes = new MatrixIndexes(row_offset/_brlen+1, 1); + tmp.block = value2.sliceOperations(0, value2.getNumRows()-1, + i, Math.min(i+(int)_n-1, value2.getNumColumns()-1), new MatrixBlock()); + ret.add(new Tuple2<Long,Writable>(new Long((col_offset+i)/_n+1),tmp)); + } + } break; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java index 00e72c4..17d5936 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java @@ -60,6 +60,7 @@ import org.apache.sysml.runtime.controlprogram.ProgramBlock; import org.apache.sysml.runtime.controlprogram.WhileProgramBlock; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; @@ -883,7 +884,8 @@ public class ProgramConverter MatrixFormatMetaData md = (MatrixFormatMetaData) dat.getMetaData(); MatrixCharacteristics mc = md.getMatrixCharacteristics(); value = mo.getFileName(); - PDataPartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? mo.getPartitionFormat() : PDataPartitionFormat.NONE; + PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat( + mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE; matrixMetaData = new String[9]; matrixMetaData[0] = String.valueOf( mc.getRows() ); matrixMetaData[1] = String.valueOf( mc.getCols() ); @@ -1819,14 +1821,14 @@ public class ProgramConverter long nnz = Long.parseLong( st.nextToken() ); InputInfo iin = InputInfo.stringToInputInfo( st.nextToken() ); OutputInfo oin = OutputInfo.stringToOutputInfo( st.nextToken() ); - PDataPartitionFormat partFormat = PDataPartitionFormat.valueOf( st.nextToken() ); + PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() ); UpdateType inplace = UpdateType.valueOf( st.nextToken() ); MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, brows, bcols, nnz); MatrixFormatMetaData md = new MatrixFormatMetaData( mc, oin, iin ); mo.setMetaData( md ); mo.setVarName( name ); - if( partFormat!=PDataPartitionFormat.NONE ) - mo.setPartitioned( partFormat, -1 ); //TODO once we support BLOCKWISE_N we should support it here as well + if( partFormat._dpf != PDataPartitionFormat.NONE ) + mo.setPartitioned( partFormat._dpf, partFormat._N ); mo.setUpdateType(inplace); dat = mo; break; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java index 293150e..d01368e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java @@ -42,7 +42,7 @@ import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -68,12 +68,11 @@ import org.apache.sysml.yarn.DMLAppMasterUtils; */ public class RemoteDPParForMR { - protected static final Log LOG = LogFactory.getLog(RemoteDPParForMR.class.getName()); - public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, MatrixObject input, - PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params - boolean enableCPCaching, int numReducers, int replication, int max_retry) //opt params + public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, + String resultFile, MatrixObject input, PartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params + boolean enableCPCaching, int numReducers, int replication, int max_retry) //opt params throws DMLRuntimeException { RemoteParForJobReturn ret = null; @@ -104,7 +103,8 @@ public class RemoteDPParForMR long clen = input.getNumColumns(); int brlen = (int) input.getNumRowsPerBlock(); int bclen = (int) input.getNumColumnsPerBlock(); - MRJobConfiguration.setPartitioningInfo(job, rlen, clen, brlen, bclen, InputInfo.BinaryBlockInputInfo, oi, dpf, 1, input.getFileName(), itervar, matrixvar, tSparseCol); + MRJobConfiguration.setPartitioningInfo(job, rlen, clen, brlen, bclen, InputInfo.BinaryBlockInputInfo, + oi, dpf._dpf, dpf._N, input.getFileName(), itervar, matrixvar, tSparseCol); job.setInputFormat(InputInfo.BinaryBlockInputInfo.inputFormatClass); FileInputFormat.setInputPaths(job, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java index b801402..a612a3e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java @@ -42,6 +42,7 @@ import org.apache.sysml.api.DMLScript; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; @@ -70,7 +71,7 @@ public class RemoteDPParForSpark protected static final Log LOG = LogFactory.getLog(RemoteDPParForSpark.class.getName()); public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, HashMap<String, byte[]> clsMap, - String resultFile, MatrixObject input, ExecutionContext ec, PDataPartitionFormat dpf, OutputInfo oi, + String resultFile, MatrixObject input, ExecutionContext ec, PartitionFormat dpf, OutputInfo oi, boolean tSparseCol, boolean enableCPCaching, int numReducers ) throws DMLRuntimeException { @@ -91,7 +92,7 @@ public class RemoteDPParForSpark //compute number of reducers (to avoid OOMs and reduce memory pressure) int numParts = SparkUtils.getNumPreferredPartitions(mc, in); - int numParts2 = (int)((dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : mc.getCols()); + int numParts2 = (int)((dpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : mc.getCols()); int numReducers2 = Math.max(numReducers, Math.min(numParts, numParts2)); //core parfor datapartition-execute (w/ or w/o shuffle, depending on data characteristics) @@ -123,7 +124,7 @@ public class RemoteDPParForSpark @SuppressWarnings("unchecked") private static JavaPairRDD<Long, Writable> getPartitionedInput(SparkExecutionContext sec, - String matrixvar, OutputInfo oi, PDataPartitionFormat dpf) + String matrixvar, OutputInfo oi, PartitionFormat dpf) throws DMLRuntimeException { InputInfo ii = InputInfo.BinaryBlockInputInfo; @@ -153,7 +154,7 @@ public class RemoteDPParForSpark { //get input rdd and data partitioning JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar); - DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf); + DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf._dpf, dpf._N); return in.flatMapToPair(dpfun); } //default binary block input rdd with grouping @@ -167,22 +168,22 @@ public class RemoteDPParForSpark mo.getRDDHandle().getLineageChilds().get(0)).getRDD(); //data partitioning of input rdd - DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf); + DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf._dpf, dpf._N); return in.flatMapToPair(dpfun); } } //determines if given input matrix requires grouping of partial partition slices - private static boolean requiresGrouping(PDataPartitionFormat dpf, MatrixObject mo) { + private static boolean requiresGrouping(PartitionFormat dpf, MatrixObject mo) { MatrixCharacteristics mc = mo.getMatrixCharacteristics(); - return ((dpf == PDataPartitionFormat.ROW_WISE && mc.getNumColBlocks() > 1) - || (dpf == PDataPartitionFormat.COLUMN_WISE && mc.getNumRowBlocks() > 1)) + return ((dpf == PartitionFormat.ROW_WISE && mc.getNumColBlocks() > 1) + || (dpf == PartitionFormat.COLUMN_WISE && mc.getNumRowBlocks() > 1)) && !hasInputDataSet(dpf, mo); } //determines if given input matrix wraps input data set applicable to direct processing - private static boolean hasInputDataSet(PDataPartitionFormat dpf, MatrixObject mo) { - return (dpf == PDataPartitionFormat.ROW_WISE + private static boolean hasInputDataSet(PartitionFormat dpf, MatrixObject mo) { + return (dpf == PartitionFormat.ROW_WISE && mo.getRDDHandle().isCheckpointRDD() && mo.getRDDHandle().getLineageChilds().size()==1 && mo.getRDDHandle().getLineageChilds().get(0).getLineageChilds().size()==1 http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 400fbd5..0d368e8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -33,6 +33,7 @@ import org.apache.spark.util.LongAccumulator; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.codegen.CodegenUtils; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType; @@ -70,7 +71,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF private final LongAccumulator _aIters; public RemoteDPParForSparkWorker(String program, HashMap<String, byte[]> clsMap, String inputVar, String iterVar, - boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, + boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, LongAccumulator aiters) throws DMLRuntimeException { @@ -86,12 +87,12 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF _aIters = aiters; //setup matrix block partition meta data - _rlen = (dpf != PDataPartitionFormat.ROW_WISE) ? (int)mc.getRows() : 1; - _clen = (dpf != PDataPartitionFormat.COLUMN_WISE) ? (int)mc.getCols() : 1; + _rlen = (dpf != PartitionFormat.ROW_WISE) ? (int)mc.getRows() : 1; + _clen = (dpf != PartitionFormat.COLUMN_WISE) ? (int)mc.getCols() : 1; _brlen = mc.getRowsPerBlock(); _bclen = mc.getColsPerBlock(); _tSparseCol = tSparseCol; - _dpf = dpf; + _dpf = dpf._dpf; } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java index 26c30d4..22126fe 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.sysml.lops.Lop; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; /** * Internal representation of a plan alternative for program blocks and instructions @@ -457,7 +458,7 @@ public class OptNode //partitioned read identified by selected partition format String tmp = getParam(ParamType.DATA_PARTITION_FORMAT); ret = ( tmp !=null - && PDataPartitionFormat.valueOf(tmp)!=PDataPartitionFormat.NONE + && PartitionFormat.valueOf(tmp)._dpf!=PDataPartitionFormat.NONE && flagNested ); } else http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java index 6edcec3..2e4b95b 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java @@ -29,12 +29,12 @@ import org.apache.sysml.parser.ParForStatementBlock; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; -import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitioner; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.POptMode; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PResultMerge; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure; @@ -107,7 +107,7 @@ public class OptimizerConstrained extends OptimizerRuleBased //OPTIMIZE PARFOR PLAN // rewrite 1: data partitioning (incl. log. recompile RIX) - HashMap<String, PDataPartitionFormat> partitionedMatrices = new HashMap<String,PDataPartitionFormat>(); + HashMap<String, PartitionFormat> partitionedMatrices = new HashMap<String, PartitionFormat>(); rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget() ); double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate @@ -230,7 +230,7 @@ public class OptimizerConstrained extends OptimizerRuleBased /// @Override - protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PDataPartitionFormat> partitionedMatrices, double thetaM) + protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PartitionFormat> partitionedMatrices, double thetaM) throws DMLRuntimeException { boolean blockwise = false; @@ -362,7 +362,7 @@ public class OptimizerConstrained extends OptimizerRuleBased //REWRITE set fused data partitioning / execution /// - protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, double M, boolean flagLIX, HashMap<String, PDataPartitionFormat> partitionedMatrices, LocalVariableMap vars, PExecMode emode) + protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, double M, boolean flagLIX, HashMap<String, PartitionFormat> partitionedMatrices, LocalVariableMap vars, PExecMode emode) throws DMLRuntimeException { if( emode == PExecMode.REMOTE_MR_DP || emode == PExecMode.REMOTE_SPARK_DP ) @@ -378,15 +378,15 @@ public class OptimizerConstrained extends OptimizerRuleBased } String moVarname = partitionedMatrices.keySet().iterator().next(); - PDataPartitionFormat moDpf = partitionedMatrices.get(moVarname); + PartitionFormat moDpf = partitionedMatrices.get(moVarname); MatrixObject mo = (MatrixObject)vars.get(moVarname); //check if access via iteration variable and sizes match String iterVarname = pfpb.getIterablePredicateVars()[0]; if( rIsAccessByIterationVariable(pn, moVarname, iterVarname) && - ((moDpf==PDataPartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || - (moDpf==PDataPartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N)) ) + ((moDpf==PartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || + (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N)) ) { int k = (int)Math.min(_N,_rk2); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 554b217..0e7170f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -82,6 +82,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.POptMode; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PResultMerge; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.WhileProgramBlock; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; @@ -239,7 +240,7 @@ public class OptimizerRuleBased extends Optimizer //OPTIMIZE PARFOR PLAN // rewrite 1: data partitioning (incl. log. recompile RIX and flag opt nodes) - HashMap<String, PDataPartitionFormat> partitionedMatrices = new HashMap<String,PDataPartitionFormat>(); + HashMap<String, PartitionFormat> partitionedMatrices = new HashMap<String, PartitionFormat>(); rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget() ); double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate @@ -403,7 +404,7 @@ public class OptimizerRuleBased extends Optimizer //REWRITE set data partitioner /// - protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PDataPartitionFormat> partitionedMatrices, double thetaM ) + protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PartitionFormat> partitionedMatrices, double thetaM ) throws DMLRuntimeException { if( n.getNodeType() != NodeType.PARFOR ) @@ -423,13 +424,13 @@ public class OptimizerRuleBased extends Optimizer && (_N >= PROB_SIZE_THRESHOLD_PARTITIONING || _Nmax >= PROB_SIZE_THRESHOLD_PARTITIONING) ) //only if beneficial wrt problem size { ArrayList<String> cand = pfsb.getReadOnlyParentVars(); - HashMap<String, PDataPartitionFormat> cand2 = new HashMap<String, PDataPartitionFormat>(); + HashMap<String, PartitionFormat> cand2 = new HashMap<String, PartitionFormat>(); for( String c : cand ) { - PDataPartitionFormat dpf = pfsb.determineDataPartitionFormat( c ); + PartitionFormat dpf = pfsb.determineDataPartitionFormat( c ); - if( dpf != PDataPartitionFormat.NONE - && dpf != PDataPartitionFormat.BLOCK_WISE_M_N ) + if( dpf != PartitionFormat.NONE + && dpf._dpf != PDataPartitionFormat.BLOCK_WISE_M_N ) { cand2.put( c, dpf ); } @@ -459,7 +460,7 @@ public class OptimizerRuleBased extends Optimizer return blockwise; } - protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PDataPartitionFormat> cand, LocalVariableMap vars, double thetaM ) + protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PartitionFormat> cand, LocalVariableMap vars, double thetaM ) throws DMLRuntimeException { boolean ret = false; @@ -477,7 +478,7 @@ public class OptimizerRuleBased extends Optimizer String inMatrix = h.getInput().get(0).getName(); if( cand.containsKey(inMatrix) ) //Required Condition: partitioning applicable { - PDataPartitionFormat dpf = cand.get(inMatrix); + PartitionFormat dpf = cand.get(inMatrix); double mnew = getNewRIXMemoryEstimate( n, inMatrix, dpf, vars ); //NOTE: for the moment, we do not partition according to the remote mem, because we can execute //it even without partitioning in CP. However, advanced optimizers should reason about this @@ -516,7 +517,7 @@ public class OptimizerRuleBased extends Optimizer * @return memory estimate * @throws DMLRuntimeException if DMLRuntimeException occurs */ - protected double getNewRIXMemoryEstimate( OptNode n, String varName, PDataPartitionFormat dpf, LocalVariableMap vars ) + protected double getNewRIXMemoryEstimate( OptNode n, String varName, PartitionFormat dpf, LocalVariableMap vars ) throws DMLRuntimeException { double mem = -1; @@ -528,7 +529,7 @@ public class OptimizerRuleBased extends Optimizer MatrixObject mo = (MatrixObject) dat; //those are worst-case (dense) estimates - switch( dpf ) + switch( dpf._dpf ) { case COLUMN_WISE: mem = OptimizerUtils.estimateSize(mo.getNumRows(), 1); @@ -536,10 +537,12 @@ public class OptimizerRuleBased extends Optimizer case ROW_WISE: mem = OptimizerUtils.estimateSize(1, mo.getNumColumns()); break; - case BLOCK_WISE_M_N: - mem = Integer.MAX_VALUE; //TODO + case COLUMN_BLOCK_WISE_N: + mem = OptimizerUtils.estimateSize(mo.getNumRows(), dpf._N); break; - + case ROW_BLOCK_WISE_N: + mem = OptimizerUtils.estimateSize(dpf._N, mo.getNumColumns()); + break; default: //do nothing } @@ -586,7 +589,7 @@ public class OptimizerRuleBased extends Optimizer return LopProperties.ExecType.CP_FILE; } - public static boolean allowsBinaryCellPartitions( MatrixObject mo, PDataPartitionFormat dpf ) + public static boolean allowsBinaryCellPartitions( MatrixObject mo, PartitionFormat dpf ) throws DMLRuntimeException { return (getRIXExecType(mo, PDataPartitionFormat.COLUMN_BLOCK_WISE, false)==LopProperties.ExecType.CP ); @@ -1031,11 +1034,11 @@ public class OptimizerRuleBased extends Optimizer && n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) && n.getParam(ParamType.DATA_PARTITION_FORMAT) != null ) { - PDataPartitionFormat dpf = PDataPartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT)); + PartitionFormat dpf = PartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT)); Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); String inMatrix = h.getInput().get(0).getName(); String indexAccess = null; - switch( dpf ) + switch( dpf._dpf ) { case ROW_WISE: //input 1 and 2 eq if( h.getInput().get(1) instanceof DataOp ) @@ -1072,7 +1075,7 @@ public class OptimizerRuleBased extends Optimizer * @param vars local variable map * @throws DMLRuntimeException if DMLRuntimeException occurs */ - protected void rewriteSetPartitionReplicationFactor( OptNode n, HashMap<String, PDataPartitionFormat> partitionedMatrices, LocalVariableMap vars ) + protected void rewriteSetPartitionReplicationFactor( OptNode n, HashMap<String, PartitionFormat> partitionedMatrices, LocalVariableMap vars ) throws DMLRuntimeException { boolean apply = false; @@ -1487,7 +1490,7 @@ public class OptimizerRuleBased extends Optimizer * @param vars local variable map * @throws DMLRuntimeException if DMLRuntimeException occurs */ - protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, double M, boolean flagLIX, HashMap<String, PDataPartitionFormat> partitionedMatrices, LocalVariableMap vars) + protected void rewriteSetFusedDataPartitioningExecution(OptNode pn, double M, boolean flagLIX, HashMap<String, PartitionFormat> partitionedMatrices, LocalVariableMap vars) throws DMLRuntimeException { //assertions (warnings of corrupt optimizer decisions) @@ -1513,15 +1516,15 @@ public class OptimizerRuleBased extends Optimizer //partitioned matrix String moVarname = partitionedMatrices.keySet().iterator().next(); - PDataPartitionFormat moDpf = partitionedMatrices.get(moVarname); + PartitionFormat moDpf = partitionedMatrices.get(moVarname); MatrixObject mo = (MatrixObject)vars.get(moVarname); //check if access via iteration variable and sizes match String iterVarname = pfpb.getIterablePredicateVars()[0]; if( rIsAccessByIterationVariable(pn, moVarname, iterVarname) && - ((moDpf==PDataPartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || - (moDpf==PDataPartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N)) ) + ((moDpf==PartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || + (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N)) ) { int k = (int)Math.min(_N,_rk2); @@ -1554,11 +1557,11 @@ public class OptimizerRuleBased extends Optimizer && n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) && n.getParam(ParamType.DATA_PARTITION_FORMAT) != null ) { - PDataPartitionFormat dpf = PDataPartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT)); + PartitionFormat dpf = PartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT)); Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); String inMatrix = h.getInput().get(0).getName(); String indexAccess = null; - switch( dpf ) + switch( dpf._dpf ) { case ROW_WISE: //input 1 and 2 eq if( h.getInput().get(1) instanceof DataOp ) @@ -1584,7 +1587,7 @@ public class OptimizerRuleBased extends Optimizer //REWRITE transpose sparse vector operations /// - protected void rewriteSetTranposeSparseVectorOperations(OptNode pn, HashMap<String, PDataPartitionFormat> partitionedMatrices, LocalVariableMap vars) + protected void rewriteSetTranposeSparseVectorOperations(OptNode pn, HashMap<String, PartitionFormat> partitionedMatrices, LocalVariableMap vars) throws DMLRuntimeException { //assertions (warnings of corrupt optimizer decisions) @@ -1600,11 +1603,11 @@ public class OptimizerRuleBased extends Optimizer && partitionedMatrices.size()==1 ) //general applicable { String moVarname = partitionedMatrices.keySet().iterator().next(); - PDataPartitionFormat moDpf = partitionedMatrices.get(moVarname); + PartitionFormat moDpf = partitionedMatrices.get(moVarname); Data dat = vars.get(moVarname); if( dat !=null && dat instanceof MatrixObject - && moDpf == PDataPartitionFormat.COLUMN_WISE + && moDpf == PartitionFormat.COLUMN_WISE && ((MatrixObject)dat).getSparsity()<=MatrixBlock.SPARSITY_TURN_POINT //check for sparse matrix && rIsTransposeSafePartition(pn, moVarname) ) //tranpose-safe { @@ -2541,7 +2544,7 @@ public class OptimizerRuleBased extends Optimizer //REWRITE enable runtime piggybacking /// - protected void rewriteEnableRuntimePiggybacking( OptNode n, LocalVariableMap vars, HashMap<String, PDataPartitionFormat> partitionedMatrices ) + protected void rewriteEnableRuntimePiggybacking( OptNode n, LocalVariableMap vars, HashMap<String, PartitionFormat> partitionedMatrices ) throws DMLRuntimeException { ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java index 7b6bb29..a1e9ca1 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/MatrixIndexingCPFileInstruction.java @@ -114,9 +114,15 @@ public final class MatrixIndexingCPFileInstruction extends IndexingCPInstruction case ROW_WISE: mcNew = new MatrixCharacteristics( 1, mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock() ); break; + case ROW_BLOCK_WISE_N: + mcNew = new MatrixCharacteristics( mo.getPartitionSize(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock() ); + break; case COLUMN_WISE: mcNew = new MatrixCharacteristics( mc.getRows(), 1, mc.getRowsPerBlock(), mc.getColsPerBlock() ); - break; + break; + case COLUMN_BLOCK_WISE_N: + mcNew = new MatrixCharacteristics( mc.getRows(), mo.getPartitionSize(), mc.getRowsPerBlock(), mc.getColsPerBlock() ); + break; default: throw new DMLRuntimeException("Unsupported partition format for CP_FILE rangeReIndex: "+ mo.getPartitionFormat()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java b/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java index fc05fb1..3675194 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java @@ -22,6 +22,7 @@ package org.apache.sysml.runtime.matrix; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.DataPartitioner; import org.apache.sysml.runtime.controlprogram.parfor.DataPartitionerRemoteMR; @@ -81,8 +82,8 @@ public class DataPartitionMR default: throw new DMLRuntimeException("Unsupported partition format for distributed cache input: "+pformat); } - - DataPartitioner dpart = new DataPartitionerRemoteMR(pformat, (int)N, -1, numReducers, replication, 4, false, true); + PartitionFormat pf = new PartitionFormat(pformat, (int)N); + DataPartitioner dpart = new DataPartitionerRemoteMR(pf, -1, numReducers, replication, 4, false, true); out = dpart.createPartitionedMatrixObject(in, out, true); sts[i] = out.getMatrixCharacteristics(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java new file mode 100644 index 0000000..a01d22e --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.parfor; + +import java.util.HashMap; + +import org.junit.Test; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.conf.CompilerConfig; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitioner; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode; +import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +public class ParForBlockwiseDataPartitioningTest extends AutomatedTestBase +{ + //positive test cases, i.e., test cases where row/column block partitioning applied + private final static String TEST_NAME1 = "parfor_brdatapartitioning_pos"; + private final static String TEST_NAME2 = "parfor_bcdatapartitioning_pos"; + //negative test cases, i.e., test cases where row/column block partitioning not applied + private final static String TEST_NAME3 = "parfor_brdatapartitioning_neg"; + private final static String TEST_NAME4 = "parfor_bcdatapartitioning_neg"; + + private final static String TEST_DIR = "functions/parfor/"; + private final static String TEST_CLASS_DIR = TEST_DIR + ParForBlockwiseDataPartitioningTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + + //moderate data size, force spark rix via unknowns + private final static int rows = (int)1014; + private final static int cols = (int)2024; + private final static double sparsity1 = 0.7; + private final static double sparsity2 = 0.01; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "Rout" }) ); + addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "Rout" }) ); + addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] { "Rout" }) ); + addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] { "Rout" }) ); + } + + + @Test + public void testParForRowBlockPartitioningLocalLocalDense() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.LOCAL, PExecMode.LOCAL, false); + } + + @Test + public void testParForRowBlockPartitioningLocalRemoteSparkDense() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, false); + } + + @Test + public void testParForRowBlockPartitioningRemoteSparkLocalDense() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, false); + } + + @Test + public void testParForRowBlockPartitioningRemoteSparkRemoteDense() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, false); + } + + @Test + public void testParForRowBlockPartitioningLocalLocalSparse() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.LOCAL, PExecMode.LOCAL, true); + } + + @Test + public void testParForRowBlockPartitioningLocalRemoteSparkSparse() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, true); + } + + @Test + public void testParForRowBlockPartitioningRemoteSparkLocalSparse() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true); + } + + @Test + public void testParForRowBlockPartitioningRemoteSparkRemoteSparse() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true); + } + + @Test + public void testParForColBlockPartitioningLocalLocalDense() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.LOCAL, PExecMode.LOCAL, false); + } + + @Test + public void testParForColBlockPartitioningLocalRemoteSparkDense() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, false); + } + + @Test + public void testParForColBlockPartitioningRemoteSparkLocalDense() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, false); + } + + @Test + public void testParForColBlockPartitioningRemoteSparkRemoteDense() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, false); + } + + @Test + public void testParForColBlockPartitioningLocalLocalSparse() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.LOCAL, PExecMode.LOCAL, true); + } + + @Test + public void testParForColBlockPartitioningLocalRemoteSparkSparse() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, true); + } + + @Test + public void testParForColBlockPartitioningRemoteSparkLocalSparse() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true); + } + + @Test + public void testParForColBlockPartitioningRemoteSparkRemoteSparse() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true); + } + + //negative examples + + @Test + public void testParForRowBlockPartitioningRemoteSparkLocalSparseNegative() { + runParForDataPartitioningTest(TEST_NAME3, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true); + } + + @Test + public void testParForRowBlockPartitioningRemoteSparkRemoteSparseNegative() { + runParForDataPartitioningTest(TEST_NAME3, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true); + } + + @Test + public void testParForColBlockPartitioningRemoteSparkLocalSparseNegative() { + runParForDataPartitioningTest(TEST_NAME4, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true); + } + + @Test + public void testParForColBlockPartitioningRemoteSparkRemoteSparseNegative() { + runParForDataPartitioningTest(TEST_NAME4, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true); + } + + private void runParForDataPartitioningTest( String testname, PDataPartitioner partitioner, PExecMode mode, boolean sparse ) + { + RUNTIME_PLATFORM oldRT = rtplatform; + boolean oldUseSparkConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; + boolean oldDynRecompile = CompilerConfig.FLAG_DYN_RECOMPILE; + + //run always in spark execution mode + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; + + try + { + TestConfiguration config = getTestConfiguration(testname); + loadTestConfiguration(config); + + CompilerConfig.FLAG_DYN_RECOMPILE = false; + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + testname + ".dml"; + programArgs = new String[]{"-args", input("V"), + partitioner.name(), mode.name(), output("R") }; + + fullRScriptName = HOME + testname + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir(); + + //generate input data + int lrows = testname.equals(TEST_NAME1) || testname.equals(TEST_NAME3) ? rows : cols; + int lcols = testname.equals(TEST_NAME1) || testname.equals(TEST_NAME3) ? cols : rows; + double lsparsity = sparse ? sparsity2 : sparsity1; + double[][] V = getRandomMatrix(lrows, lcols, 0, 1, lsparsity, System.nanoTime()); + writeInputMatrixWithMTD("V", V, true); + + //run test + runTest(true, false, null, -1); + runRScript(true); + + //compare matrices + HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); + HashMap<CellIndex, Double> rfile = readRMatrixFromFS("Rout"); + TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", "R"); + } + finally + { + rtplatform = oldRT; + DMLScript.USE_LOCAL_SPARK_CONFIG = oldUseSparkConfig; + CompilerConfig.FLAG_DYN_RECOMPILE = oldDynRecompile; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.R b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.R new file mode 100644 index 0000000..488d237 --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.R @@ -0,0 +1,37 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +X = as.matrix(readMM(paste(args[1], "V.mtx", sep=""))) +N = 200; + +R = matrix(0, ceiling(ncol(X)/N), 1); +for( bi in 1:ceiling(ncol(X)/N)) { + Xbi = X[,(7+(bi-1)*N+1):min(bi*N,ncol(X))]; + R[bi,1] = sum(Xbi); +} + +writeMM(as(R, "CsparseMatrix"), paste(args[2], "Rout", sep="")); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.dml b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.dml new file mode 100644 index 0000000..5a08430 --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_neg.dml @@ -0,0 +1,37 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +X = read($1); +N = 200; + +if( sum(X)<0 ) #cause unknown sparsity + X = matrix(1, rows=100000000, cols=ncol(X)); + +R = matrix(0, rows=ceil(ncol(X)/N), cols=1); + +parfor( bi in 1:ceil(ncol(X)/N), opt=CONSTRAINED, datapartitioner=$2, mode=$3, log=DEBUG) { + Xbi = X[,(7+(bi-1)*N+1):min(bi*N,ncol(X))]; + print(sum(Xbi)); + R[bi,1] = sum(Xbi); +} + +write(R, $4); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.R b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.R new file mode 100644 index 0000000..62413b5 --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.R @@ -0,0 +1,37 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +X = as.matrix(readMM(paste(args[1], "V.mtx", sep=""))) +N = 200; + +R = matrix(0, ceiling(ncol(X)/N), 1); +for( bi in 1:ceiling(ncol(X)/N)) { + Xbi = X[,((bi-1)*N+1):min(bi*N,ncol(X))]; + R[bi,1] = sum(Xbi); +} + +writeMM(as(R, "CsparseMatrix"), paste(args[2], "Rout", sep="")); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/39f75ca0/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.dml b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.dml new file mode 100644 index 0000000..0e6b56a --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_bcdatapartitioning_pos.dml @@ -0,0 +1,37 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +X = read($1); +N = 200; + +if( sum(X)<0 ) #cause unknown sparsity + X = matrix(1, rows=100000000, cols=ncol(X)); + +R = matrix(0, rows=ceil(ncol(X)/N), cols=1); + +parfor( bi in 1:ceil(ncol(X)/N), opt=CONSTRAINED, datapartitioner=$2, mode=$3, log=DEBUG) { + Xbi = X[,((bi-1)*N+1):min(bi*N,ncol(X))]; + print(sum(Xbi)); + R[bi,1] = sum(Xbi); +} + +write(R, $4);
