[SYSTEMML-1877] Fix setup parfor data-partition-execute (univar stats) This patch fixes a setup issue for column-wise partitioning in the fused parfor data-partition-execute spark job. To avoid such issues in the future, this also includes a refactoring to centralize the analysis of partition sizes as well as test cases that force the fused data-partition-execute job for both MR and Spark.
Additionally, this patch fixes the constrained parfor optimizer to consider the fused data-partition-execute job (so far, this job was never selected even if both data partitioning and execution were forced to its remote execution types). Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9178a954 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9178a954 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9178a954 Branch: refs/heads/master Commit: 9178a95495227e978dcc1049f04431ddbd2c4fc5 Parents: ec35215 Author: Matthias Boehm <[email protected]> Authored: Thu Aug 31 22:39:32 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Sep 1 12:13:25 2017 -0700 ---------------------------------------------------------------------- .../controlprogram/ParForProgramBlock.java | 29 ++++ .../parfor/DataPartitionerRemoteMapper.java | 28 ++-- .../parfor/RemoteDPParForSparkWorker.java | 21 +-- .../parfor/RemoteDPParWorkerReducer.java | 33 ++-- .../RemoteParForColocatedNLineInputFormat.java | 20 +-- .../parfor/opt/OptimizerConstrained.java | 21 ++- .../parfor/opt/OptimizerRuleBased.java | 3 +- .../matrix/mapred/MRJobConfiguration.java | 47 ++---- .../parfor/ParForDataPartitionExecuteTest.java | 153 +++++++++++++++++++ .../functions/parfor/DataPartitionExecute.R | 41 +++++ .../functions/parfor/DataPartitionExecute.dml | 42 +++++ .../functions/parfor/ZPackageSuite.java | 1 + 12 files changed, 330 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index 3a9bf51..ce8bbed 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -215,6 +215,11 @@ public class ParForProgramBlock extends ForProgramBlock return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N || _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N; } + public boolean isRowwise() { + return _dpf == PDataPartitionFormat.ROW_WISE + || _dpf == PDataPartitionFormat.ROW_BLOCK_WISE + || _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N; + } public long getNumParts(MatrixCharacteristics mc) { switch( _dpf ) { case ROW_WISE: return mc.getRows(); @@ -227,6 +232,30 @@ public class ParForProgramBlock extends ForProgramBlock throw new RuntimeException("Unsupported partition format: "+_dpf); } } + public long getNumRows(MatrixCharacteristics mc) { + switch( _dpf ) { + case ROW_WISE: return 1; + case ROW_BLOCK_WISE: return mc.getRowsPerBlock(); + case ROW_BLOCK_WISE_N: return _N; + case COLUMN_WISE: return mc.getRows(); + case COLUMN_BLOCK_WISE: return mc.getRows(); + case COLUMN_BLOCK_WISE_N: return mc.getRows(); + default: + throw new RuntimeException("Unsupported partition format: "+_dpf); + } + } + public long getNumColumns(MatrixCharacteristics mc) { + switch( _dpf ) { + case ROW_WISE: return mc.getCols(); + case ROW_BLOCK_WISE: return mc.getCols(); + case ROW_BLOCK_WISE_N: return mc.getCols(); + case COLUMN_WISE: return 1; + case COLUMN_BLOCK_WISE: return mc.getColsPerBlock(); + case COLUMN_BLOCK_WISE_N: return _N; + default: + throw new RuntimeException("Unsupported partition format: "+_dpf); + } + } } public enum PDataPartitioner { http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java index 6fcfc1a..e26201e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMapper.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -48,15 +49,11 @@ import org.apache.sysml.runtime.util.MapReduceTool; */ public class DataPartitionerRemoteMapper implements Mapper<Writable, Writable, Writable, Writable> -{ - +{ private DataPartitionerMapper _mapper = null; - public DataPartitionerRemoteMapper( ) - { - - } - + public DataPartitionerRemoteMapper( ) { } + @Override public void map(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter) throws IOException @@ -67,10 +64,7 @@ public class DataPartitionerRemoteMapper @Override public void configure(JobConf job) { - long rlen = MRJobConfiguration.getPartitioningNumRows( job ); - long clen = MRJobConfiguration.getPartitioningNumCols( job ); - int brlen = MRJobConfiguration.getPartitioningBlockNumRows( job ); - int bclen = MRJobConfiguration.getPartitioningBlockNumCols( job ); + MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job); InputInfo ii = MRJobConfiguration.getPartitioningInputInfo( job ); OutputInfo oi = MRJobConfiguration.getPartitioningOutputInfo( job ); PDataPartitionFormat pdf = MRJobConfiguration.getPartitioningFormat( job ); @@ -78,17 +72,21 @@ public class DataPartitionerRemoteMapper boolean keepIndexes = MRJobConfiguration.getPartitioningIndexFlag( job ); if( ii == InputInfo.TextCellInputInfo ) - _mapper = new DataPartitionerMapperTextcell(rlen, clen, brlen, bclen, pdf, n); + _mapper = new DataPartitionerMapperTextcell(mc.getRows(), mc.getCols(), + mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n); else if( ii == InputInfo.BinaryCellInputInfo ) - _mapper = new DataPartitionerMapperBinarycell(rlen, clen, brlen, bclen, pdf, n); + _mapper = new DataPartitionerMapperBinarycell(mc.getRows(), mc.getCols(), + mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n); else if( ii == InputInfo.BinaryBlockInputInfo ) { if( oi == OutputInfo.BinaryBlockOutputInfo ) - _mapper = new DataPartitionerMapperBinaryblock(rlen, clen, brlen, bclen, pdf, n, keepIndexes); + _mapper = new DataPartitionerMapperBinaryblock(mc.getRows(), mc.getCols(), + mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n, keepIndexes); else if( oi == OutputInfo.BinaryCellOutputInfo ) { boolean outputEmpty = MRJobConfiguration.getProgramBlocks(job)!=null; //fused parfor - _mapper = new DataPartitionerMapperBinaryblock2Binarycell(job, rlen, clen, brlen, bclen, pdf, n, keepIndexes, outputEmpty); + _mapper = new DataPartitionerMapperBinaryblock2Binarycell(job, mc.getRows(), mc.getCols(), + mc.getRowsPerBlock(), mc.getColsPerBlock(), pdf, n, keepIndexes, outputEmpty); } else throw new RuntimeException("Partitioning from '"+ii+"' to '"+oi+"' not supported"); http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index d20e7f3..dbc3fbf 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -87,18 +87,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF _aIters = aiters; //setup matrix block partition meta data - switch( dpf._dpf ) { - case ROW_WISE: - _rlen = (int)mc.getRows(); _clen = 1; break; - case ROW_BLOCK_WISE_N: - _rlen = dpf._N; _clen = (int)mc.getCols(); break; - case COLUMN_BLOCK_WISE: - _rlen = 1; _clen = (int)mc.getCols(); break; - case COLUMN_BLOCK_WISE_N: - _rlen = (int)mc.getRows(); _clen = dpf._N; break; - default: - throw new RuntimeException("Unsupported partition format: "+dpf._dpf.name()); - } + _rlen = (int)dpf.getNumRows(mc); + _clen = (int)dpf.getNumColumns(mc); _brlen = mc.getRowsPerBlock(); _bclen = mc.getColsPerBlock(); _tSparseCol = tSparseCol; @@ -237,8 +227,8 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF int col_offset = (int)(pval.indexes.getColumnIndex()-1)*_bclen; if( !partition.isInSparseFormat() ) //DENSE partition.copy( row_offset, row_offset+pval.block.getNumRows()-1, - col_offset, col_offset+pval.block.getNumColumns()-1, - pval.block, false ); + col_offset, col_offset+pval.block.getNumColumns()-1, + pval.block, false ); else //SPARSE partition.appendToSparse(pval.block, row_offset, col_offset); lnnz += pval.block.getNonZeros(); @@ -250,8 +240,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF partition.setNonZeros(lnnz); partition.examSparsity(); } - catch(DMLRuntimeException ex) - { + catch(DMLRuntimeException ex) { throw new IOException(ex); } http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java index 3f26945..d022ac8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java @@ -32,6 +32,7 @@ import org.apache.sysml.api.DMLScript; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -42,6 +43,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; import org.apache.sysml.runtime.instructions.cp.IntObject; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; @@ -66,18 +68,15 @@ public class RemoteDPParWorkerReducer extends ParWorker //reuse matrix partition private MatrixBlock _partition = null; private boolean _tSparseCol = false; - - //MR ParWorker attributes - protected String _stringID = null; + + //MR ParWorker attributes + protected String _stringID = null; //cached collector/reporter protected OutputCollector<Writable, Writable> _out = null; protected Reporter _report = null; - public RemoteDPParWorkerReducer() - { - - } + public RemoteDPParWorkerReducer() { } @Override public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter) @@ -107,8 +106,7 @@ public class RemoteDPParWorkerReducer extends ParWorker //execute program executeTask( lTask ); } - catch(Exception ex) - { + catch(Exception ex) { throw new IOException("ParFOR: Failed to execute task.",ex); } @@ -120,18 +118,15 @@ public class RemoteDPParWorkerReducer extends ParWorker public void configure(JobConf job) { //Step 1: configure data partitioning information - _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job ); - _clen = (int)MRJobConfiguration.getPartitioningNumCols( job ); - _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job ); - _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job ); + _dpf = MRJobConfiguration.getPartitioningFormat( job ); + MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job); + PartitionFormat pf = new PartitionFormat(_dpf, MRJobConfiguration.getPartitioningSizeN(job)); + _rlen = (int)pf.getNumRows(mc); + _clen = (int)pf.getNumColumns(mc); + _brlen = mc.getRowsPerBlock(); + _bclen = mc.getColsPerBlock(); _iterVar = MRJobConfiguration.getPartitioningItervar( job ); _inputVar = MRJobConfiguration.getPartitioningMatrixvar( job ); - _dpf = MRJobConfiguration.getPartitioningFormat( job ); - switch( _dpf ) { //create matrix partition for reuse - case ROW_WISE: _rlen = 1; break; - case COLUMN_WISE: _clen = 1; break; - default: throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf); - } _info = MRJobConfiguration.getPartitioningOutputInfo( job ); _tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job ); if( _tSparseCol ) http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java index d5cc4bc..a2a231d 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedNLineInputFormat.java @@ -27,6 +27,8 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.NLineInputFormat; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PartitionFormat; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; /** @@ -37,29 +39,21 @@ import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; */ public class RemoteParForColocatedNLineInputFormat extends NLineInputFormat { - @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException - { + { InputSplit[] tmp = super.getSplits(job, numSplits); //get partitioning information + MatrixCharacteristics mc = MRJobConfiguration.getPartitionedMatrixSize(job); PDataPartitionFormat dpf = MRJobConfiguration.getPartitioningFormat(job); - int blen = -1; - switch( dpf ) { - case ROW_WISE: blen = 1; break; - case ROW_BLOCK_WISE: blen = MRJobConfiguration.getPartitioningBlockNumRows(job); break; - case COLUMN_WISE: blen = 1; break; - case COLUMN_BLOCK_WISE: blen = MRJobConfiguration.getPartitioningBlockNumCols(job); break; - default: - //do nothing - } + PartitionFormat pf = new PartitionFormat(dpf, -1); + int blen = (int) (pf.isRowwise() ? pf.getNumRows(mc) : pf.getNumColumns(mc)); String fname = MRJobConfiguration.getPartitioningFilename(job); //create wrapper splits InputSplit[] ret = new InputSplit[ tmp.length ]; - for( int i=0; i<tmp.length; i++ ) - { + for( int i=0; i<tmp.length; i++ ) { //check for robustness of subsequent cast if( tmp[i] instanceof FileSplit ) ret[i] = new RemoteParForColocatedFileSplit( (FileSplit) tmp[i], fname, blen ); http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java index 9e78c2a..1ca5631 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java @@ -231,19 +231,18 @@ public class OptimizerConstrained extends OptimizerRuleBased protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PartitionFormat> partitionedMatrices, double thetaM) throws DMLRuntimeException { - boolean blockwise = false; - + //call rewrite first to obtain partitioning information + String initPlan = n.getParam(ParamType.DATA_PARTITIONER); + boolean blockwise = super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM); + // constraint awareness - if( !n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.UNSPECIFIED.toString()) ) - { - Object[] o = OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID()); - ParForProgramBlock pfpb = (ParForProgramBlock) o[1]; - pfpb.setDataPartitioner(PDataPartitioner.valueOf(n.getParam(ParamType.DATA_PARTITIONER))); - LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result="+n.getParam(ParamType.DATA_PARTITIONER) ); + if( !initPlan.equals(PDataPartitioner.UNSPECIFIED.name()) ) { + ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter + .getAbstractPlanMapping().getMappedProg(n.getID())[1]; + pfpb.setDataPartitioner(PDataPartitioner.valueOf(initPlan)); + LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result=" + initPlan ); } - else - super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM); - + return blockwise; } http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 154109a..9456703 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -100,7 +100,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze import org.apache.sysml.runtime.instructions.Instruction; import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.instructions.cp.FunctionCallCPInstruction; -import org.apache.sysml.runtime.instructions.gpu.context.GPUContext; import org.apache.sysml.runtime.instructions.gpu.context.GPUContextPool; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -1505,7 +1504,7 @@ public class OptimizerRuleBased extends Optimizer && partitionedMatrices.size()==1 ) //only one partitioned matrix { ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter - .getAbstractPlanMapping().getMappedProg(pn.getID())[1]; + .getAbstractPlanMapping().getMappedProg(pn.getID())[1]; //partitioned matrix String moVarname = partitionedMatrices.keySet().iterator().next(); http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java index 88512b6..5cd5daf 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java @@ -563,61 +563,42 @@ public class MRJobConfiguration //set transpose sparse column vector job.setBoolean(PARTITIONING_OUTPUT_KEEP_INDEXES_CONFIG, keepIndexes); - } - public static long getPartitioningNumRows( JobConf job ) - { - return Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG)); + public static MatrixCharacteristics getPartitionedMatrixSize(JobConf job) { + return new MatrixCharacteristics( + Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_ROW_CONFIG)), + Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG)), + Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG)), + Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG))); } - public static long getPartitioningNumCols( JobConf job ) - { - return Long.parseLong(job.get(PARTITIONING_INPUT_MATRIX_NUM_COLUMN_CONFIG)); - } - public static void setPartitioningBlockNumRows( JobConf job, int brlen ) - { + public static void setPartitioningBlockNumRows( JobConf job, int brlen ) { job.set(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG, String.valueOf(brlen)); } - public static int getPartitioningBlockNumRows( JobConf job ) - { - return Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_ROW_CONFIG)); - } - - public static void setPartitioningBlockNumCols( JobConf job, int bclen ) - { + public static void setPartitioningBlockNumCols( JobConf job, int bclen ) { job.set(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG,String.valueOf(bclen)); } - public static int getPartitioningBlockNumCols( JobConf job ) - { - return Integer.parseInt(job.get(PARTITIONING_INPUT_BLOCK_NUM_COLUMN_CONFIG)); - } - - public static InputInfo getPartitioningInputInfo( JobConf job ) - { + public static InputInfo getPartitioningInputInfo( JobConf job ) { return InputInfo.stringToInputInfo(job.get(PARTITIONING_INPUT_INFO_CONFIG)); } - public static OutputInfo getPartitioningOutputInfo( JobConf job ) - { + public static OutputInfo getPartitioningOutputInfo( JobConf job ) { return OutputInfo.stringToOutputInfo(job.get(PARTITIONING_OUTPUT_INFO_CONFIG)); } - - public static void setPartitioningFormat( JobConf job, PDataPartitionFormat dpf ) - { + + public static void setPartitioningFormat( JobConf job, PDataPartitionFormat dpf ) { job.set(PARTITIONING_OUTPUT_FORMAT_CONFIG, dpf.toString()); } - public static PDataPartitionFormat getPartitioningFormat( JobConf job ) - { + public static PDataPartitionFormat getPartitioningFormat( JobConf job ) { return PDataPartitionFormat.valueOf(job.get(PARTITIONING_OUTPUT_FORMAT_CONFIG)); } - public static int getPartitioningSizeN( JobConf job ) - { + public static int getPartitioningSizeN( JobConf job ) { return Integer.parseInt(job.get(PARTITIONING_OUTPUT_N_CONFIG)); } http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java new file mode 100644 index 0000000..80c7568 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDataPartitionExecuteTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.parfor; + +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +public class ParForDataPartitionExecuteTest extends AutomatedTestBase +{ + private final static String TEST_NAME = "DataPartitionExecute"; + private final static String TEST_DIR = "functions/parfor/"; + private final static String TEST_CLASS_DIR = TEST_DIR + ParForDataPartitionExecuteTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + + private final static int dim1 = 2001; + private final static int dim2 = 101; + private final static double sparsity1 = 0.7; + private final static double sparsity2 = 0.3d; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"R"})); + } + + @Test + public void testFusedDataPartitionExecuteRowDenseMR() { + runFusedDataPartitionExecuteTest(false, true, ExecType.MR); + } + + @Test + public void testFusedDataPartitionExecuteColDenseMR() { + runFusedDataPartitionExecuteTest(false, false, ExecType.MR); + } + + @Test + public void testFusedDataPartitionExecuteRowSparseMR() { + runFusedDataPartitionExecuteTest(true, true, ExecType.MR); + } + + @Test + public void testFusedDataPartitionExecuteColSparseMR() { + runFusedDataPartitionExecuteTest(true, false, ExecType.MR); + } + + @Test + public void testFusedDataPartitionExecuteRowDenseSpark() { + runFusedDataPartitionExecuteTest(false, true, ExecType.SPARK); + } + + @Test + public void testFusedDataPartitionExecuteColDenseSpark() { + runFusedDataPartitionExecuteTest(false, false, ExecType.SPARK); + } + + @Test + public void testFusedDataPartitionExecuteRowSparseSpark() { + runFusedDataPartitionExecuteTest(true, true, ExecType.SPARK); + } + + @Test + public void testFusedDataPartitionExecuteColSparseSpark() { + runFusedDataPartitionExecuteTest(true, false, ExecType.SPARK); + } + + private void runFusedDataPartitionExecuteTest(boolean sparse, boolean row, ExecType et) + { + RUNTIME_PLATFORM platformOld = rtplatform; + switch( et ){ + case MR: rtplatform = RUNTIME_PLATFORM.HYBRID; break; + case SPARK: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; break; + default: throw new RuntimeException("Unsupported exec type: "+et.name()); + } + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( et == ExecType.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + //modify memory budget to trigger fused datapartition-execute + long oldmem = InfrastructureAnalyzer.getLocalMaxMemory(); + InfrastructureAnalyzer.setLocalMaxMemory(1*1024*1024); //1MB + + try + { + int rows = row ? dim2 : dim1; + int cols = row ? dim1 : dim2; + + TestConfiguration config = getTestConfiguration(TEST_NAME); + config.addVariable("rows", rows); + config.addVariable("cols", cols); + loadTestConfiguration(config); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + programArgs = new String[]{"-stats", "-args", input("X"), + String.valueOf(et == ExecType.SPARK).toUpperCase(), + String.valueOf(row).toUpperCase(), output("R") }; + + fullRScriptName = HOME + TEST_NAME + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + + " " + String.valueOf(row).toUpperCase() + " " + expectedDir(); + + //generate input data + double sparsity = sparse ? sparsity2 : sparsity1; + double[][] X = getRandomMatrix(rows, cols, 0, 1, sparsity, 7); + writeInputMatrixWithMTD("X", X, true); + + //run test case + runTest(true, false, null, -1); + runRScript(true); + + //compare matrices + HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); + HashMap<CellIndex, Double> rfile = readRMatrixFromFS("R"); + TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", "R"); + + //check for compiled datapartition-execute + Assert.assertTrue(heavyHittersContainsSubString( + (et == ExecType.SPARK) ? "ParFor-DPESP" : "MR-Job_ParFor-DPEMR")); + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + InfrastructureAnalyzer.setLocalMaxMemory(oldmem); //1MB + } + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.R b/src/test/scripts/functions/parfor/DataPartitionExecute.R new file mode 100644 index 0000000..8534606 --- /dev/null +++ b/src/test/scripts/functions/parfor/DataPartitionExecute.R @@ -0,0 +1,41 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +X <- as.matrix(readMM(paste(args[1], "X.mtx", sep=""))) +R = matrix(0, nrow(X), ncol(X)); + +if( as.logical(args[2]) ) { + for(i in 1:nrow(X)) { + R[i,] = pmin(X[i,], sum(X[i,])); + } +} else { + for(i in 1:ncol(X)) { + R[,i] = pmin(X[,i], sum(X[,i])); + } +} + +writeMM(as(R, "CsparseMatrix"), paste(args[3], "R", sep="")); http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test/scripts/functions/parfor/DataPartitionExecute.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/DataPartitionExecute.dml b/src/test/scripts/functions/parfor/DataPartitionExecute.dml new file mode 100644 index 0000000..0fd2299 --- /dev/null +++ b/src/test/scripts/functions/parfor/DataPartitionExecute.dml @@ -0,0 +1,42 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = read($1); +R = matrix(0, nrow(X), ncol(X)); + +if( $2 ) { + if( $3 ) + parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, datapartitioner=REMOTE_SPARK) + R[i,] = min(X[i,], sum(X[i,])); + else + parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_SPARK_DP, datapartitioner=REMOTE_SPARK) + R[,i] = min(X[,i], sum(X[,i])); +} +else { + if( $3 ) + parfor(i in 1:nrow(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, datapartitioner=REMOTE_MR) + R[i,] = min(X[i,], sum(X[i,])); + else + parfor(i in 1:ncol(X), opt=CONSTRAINED, mode=REMOTE_MR_DP, datapartitioner=REMOTE_MR) + R[,i] = min(X[,i], sum(X[,i])); +} + +write(R, $4); http://git-wip-us.apache.org/repos/asf/systemml/blob/9178a954/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java index 976b23f..c898a7a 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java @@ -30,6 +30,7 @@ import org.junit.runners.Suite; ParForAdversarialLiteralsTest.class, ParForBlockwiseDataPartitioningTest.class, ParForColwiseDataPartitioningTest.class, + ParForDataPartitionExecuteTest.class, ParForDataPartitionLeftIndexingTest.class, ParForDependencyAnalysisTest.class, ParForFunctionSerializationTest.class,
