[SYSTEMML-2046] Large dense blocks in parfor result merge, cleanups This patch adds support for large dense blocks in parfor result merge. As a side benefit, this also improves the memory efficiency by avoiding two-dimensional dense compare blocks (with overhead per row), and by using shallow copies for existing dense block representations.
In addition, this also includes a fix for converting arbitrary matrix blocks to dense blocks, which produced incorrect results for sparse-dense conversions. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/53014ddd Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/53014ddd Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/53014ddd Branch: refs/heads/master Commit: 53014dddccd485dcc25a990e19b2a7af051fed24 Parents: 7cb9566 Author: Matthias Boehm <[email protected]> Authored: Fri Jan 5 18:55:56 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Fri Jan 5 18:55:56 2018 -0800 ---------------------------------------------------------------------- .../controlprogram/parfor/ResultMerge.java | 84 +++++++--------- .../parfor/ResultMergeLocalAutomatic.java | 6 +- .../parfor/ResultMergeLocalFile.java | 100 ++++++------------- .../parfor/ResultMergeLocalMemory.java | 34 +++---- .../parfor/ResultMergeRemoteReducer.java | 89 +++++++---------- .../parfor/ResultMergeRemoteSparkWCompare.java | 12 +-- .../parfor/ResultMergeTaggedMatrixIndexes.java | 35 ++----- .../sysml/runtime/util/DataConverter.java | 2 +- 8 files changed, 139 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java index 86c60dd..333b679 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java @@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; /** @@ -37,7 +37,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; */ public abstract class ResultMerge { - protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName()); protected static final String NAME_SUFFIX = "_rm"; @@ -47,13 +46,11 @@ public abstract class ResultMerge protected MatrixObject[] _inputs = null; protected String _outputFName = null; - protected ResultMerge( ) - { - + protected ResultMerge( ) { + //do nothing } - public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename ) - { + public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename ) { _output = out; _inputs = in; _outputFName = outputFilename; @@ -90,11 +87,9 @@ public abstract class ResultMerge * @param appendOnly ? * @throws DMLRuntimeException if DMLRuntimeException occurs */ - protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) - throws DMLRuntimeException - { + protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) throws DMLRuntimeException { //pass through to matrix block operations - out.merge(in, appendOnly); + out.merge(in, appendOnly); } /** @@ -106,7 +101,7 @@ public abstract class ResultMerge * @param compare ? * @throws DMLRuntimeException if DMLRuntimeException occurs */ - protected void mergeWithComp( MatrixBlock out, MatrixBlock in, double[][] compare ) + protected void mergeWithComp( MatrixBlock out, MatrixBlock in, DenseBlock compare ) throws DMLRuntimeException { //Notes for result correctness: @@ -115,51 +110,48 @@ public abstract class ResultMerge // * Explicit NaN awareness because for cases were original matrix contains // NaNs, since NaN != NaN, otherwise we would potentially overwrite results - if( in.isInSparseFormat() ) //sparse input format - { + if( in.isEmptyBlock(false) ) { + for( int i=0; i<in.getNumRows(); i++ ) + for( int j=0; j<in.getNumColumns(); j++ ) + if( compare.get(i, j) != 0 ) + out.quickSetValue(i, j, 0); + } + else if( in.isInSparseFormat() ) { //SPARSE int rows = in.getNumRows(); int cols = in.getNumColumns(); for( int i=0; i<rows; i++ ) - for( int j=0; j<cols; j++ ) - { - double value = in.getValueSparseUnsafe(i,j); //input value - if( (value != compare[i][j] && !Double.isNaN(value) ) //for new values only (div) - || Double.isNaN(value) != Double.isNaN(compare[i][j]) ) //NaN awareness + for( int j=0; j<cols; j++ ) { + double value = in.getValueSparseUnsafe(i,j); //input value + if( (value != compare.get(i,j) && !Double.isNaN(value) ) //for new values only (div) + || Double.isNaN(value) != Double.isNaN(compare.get(i,j)) ) //NaN awareness { - out.quickSetValue( i, j, value ); + out.quickSetValue(i, j, value); } } } - else //dense input format - { - //for a merge this case will seldom happen, as each input MatrixObject - //has at most 1/numThreads of all values in it. + else { //DENSE + //guaranteed allocated due to empty case above + DenseBlock a = in.getDenseBlock(); int rows = in.getNumRows(); int cols = in.getNumColumns(); - for( int i=0; i<rows; i++ ) - for( int j=0; j<cols; j++ ) - { - double value = in.getValueDenseUnsafe(i,j); //input value - if( (value != compare[i][j] && !Double.isNaN(value) ) //for new values only (div) - || Double.isNaN(value) != Double.isNaN(compare[i][j]) ) //NaN awareness - { - out.quickSetValue( i, j, value ); - } + for( int i=0; i<rows; i++ ) { + double[] avals = a.values(i); + int aix = a.pos(i); + for( int j=0; j<cols; j++ ) { + double value = avals[aix+j]; //input value + if( (value != compare.get(i,j) && !Double.isNaN(value) ) //for new values only (div) + || Double.isNaN(value) != Double.isNaN(compare.get(i,j)) ) //NaN awareness + { + out.quickSetValue( i, j, value ); + } } - } + } + } } - protected long computeNonZeros( MatrixObject out, List<MatrixObject> in ) - { - MatrixCharacteristics mc = out.getMatrixCharacteristics(); - long outNNZ = mc.getNonZeros(); - long ret = outNNZ; - for( MatrixObject tmp : in ) { - MatrixCharacteristics tmpmc = tmp.getMatrixCharacteristics(); - long inNNZ = tmpmc.getNonZeros(); - ret += (inNNZ - outNNZ); - } - - return ret; + protected long computeNonZeros( MatrixObject out, List<MatrixObject> in ) { + //sum of nnz of input (worker result) - output var existing nnz + return -(in.size() * out.getMatrixCharacteristics().getNonZeros()) + + in.stream().mapToLong(m -> m.getMatrixCharacteristics().getNonZeros()).sum(); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java index fe190ed..fec05c7 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java @@ -29,11 +29,9 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; public class ResultMergeLocalAutomatic extends ResultMerge { - private ResultMerge _rm = null; - public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, String outputFilename ) - { + public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, String outputFilename ) { super( out, in, outputFilename ); } @@ -72,6 +70,6 @@ public class ResultMergeLocalAutomatic extends ResultMerge else _rm = new ResultMergeLocalFile( _output, _inputs, _outputFName ); - return _rm.executeParallelMerge(par); + return _rm.executeParallelMerge(par); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java index f245cbf..af77783 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -50,6 +50,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataFormat; +import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -70,9 +71,8 @@ import org.apache.sysml.runtime.util.MapReduceTool; */ public class ResultMergeLocalFile extends ResultMerge { - //NOTE: if we allow simple copies, this might result in a scattered file and many MR tasks for subsequent jobs - public static final boolean ALLOW_COPY_CELLFILES = false; + public static final boolean ALLOW_COPY_CELLFILES = false; //internal comparison matrix private IDSequence _seq = null; @@ -681,11 +681,8 @@ public class ResultMergeLocalFile extends ResultMerge throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found."); mb = LocalFileUtils.readMatrixBlockFromLocal( dir2+"/"+lnames2[0] ); boolean appendOnly = mb.isInSparseFormat(); - double[][] compare = DataConverter.convertToDoubleMatrix(mb); - - String[] lnames = dir.list(); - for( String lname : lnames ) - { + DenseBlock compare = DataConverter.convertToDenseBlock(mb, false); + for( String lname : dir.list() ) { MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname ); mergeWithComp(mb, tmp, compare); } @@ -700,21 +697,17 @@ public class ResultMergeLocalFile extends ResultMerge else //WITHOUT COMPARE BLOCK { //copy all non-zeros from all workers - String[] lnames = dir.list(); boolean appendOnly = false; - for( String lname : lnames ) - { - if( mb == null ) - { + for( String lname : dir.list() ) { + if( mb == null ) { mb = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname ); appendOnly = mb.isInSparseFormat(); } - else - { + else { MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname ); mergeWithoutComp(mb, tmp, appendOnly); } - } + } //sort sparse due to append-only if( appendOnly ) @@ -724,19 +717,17 @@ public class ResultMergeLocalFile extends ResultMerge mb.examSparsity(); } } - else - { + else { //NOTE: whenever runtime does not need all blocks anymore, this can be removed int maxRow = (int)(((brow-1)*brlen + brlen < rlen) ? brlen : rlen - (brow-1)*brlen); int maxCol = (int)(((bcol-1)*bclen + bclen < clen) ? bclen : clen - (bcol-1)*bclen); - mb = new MatrixBlock(maxRow, maxCol, true); - } + } //mb.examSparsity(); //done on write anyway and mb not reused indexes.setIndexes(brow, bcol); writer.append(indexes, mb); - } + } } finally { IOUtilFunctions.closeSilently(writer); @@ -755,10 +746,8 @@ public class ResultMergeLocalFile extends ResultMerge long clen = mc.getCols(); int brlen = mc.getRowsPerBlock(); int bclen = mc.getColsPerBlock(); - - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - try - { + + try( BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))) ) { //for obj reuse and preventing repeated buffer re-allocations StringBuilder sb = new StringBuilder(); @@ -784,11 +773,8 @@ public class ResultMergeLocalFile extends ResultMerge throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found."); mb = StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], brlen, bclen ); boolean appendOnly = mb.isInSparseFormat(); - double[][] compare = DataConverter.convertToDoubleMatrix(mb); - - String[] lnames = dir.list(); - for( String lname : lnames ) - { + DenseBlock compare = DataConverter.convertToDenseBlock(mb, false); + for( String lname : dir.list() ) { MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen ); mergeWithComp(mb, tmp, compare); } @@ -803,17 +789,13 @@ public class ResultMergeLocalFile extends ResultMerge else //WITHOUT COMPARE BLOCK { //copy all non-zeros from all workers - String[] lnames = dir.list(); boolean appendOnly = false; - for( String lname : lnames ) - { - if( mb == null ) - { + for( String lname : dir.list() ) { + if( mb == null ) { mb = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen ); appendOnly = mb.isInSparseFormat(); } - else - { + else { MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen ); mergeWithoutComp(mb, tmp, appendOnly); } @@ -831,11 +813,9 @@ public class ResultMergeLocalFile extends ResultMerge //write the block to text cell if( mb!=null ) { - if( mb.isInSparseFormat() ) - { + if( mb.isInSparseFormat() ) { Iterator<IJV> iter = mb.getSparseBlockIterator(); - while( iter.hasNext() ) - { + while( iter.hasNext() ) { IJV lcell = iter.next(); sb.append(row_offset+lcell.getI()); sb.append(' '); @@ -843,13 +823,12 @@ public class ResultMergeLocalFile extends ResultMerge sb.append(' '); sb.append(lcell.getV()); sb.append('\n'); - out.write( sb.toString() ); + out.write( sb.toString() ); sb.setLength(0); written = true; - } + } } - else - { + else { for( int i=0; i<brlen; i++ ) for( int j=0; j<bclen; j++ ) { @@ -868,15 +847,12 @@ public class ResultMergeLocalFile extends ResultMerge } } } - } + } } if( !written ) out.write("1 1 0\n"); } - finally { - IOUtilFunctions.closeSilently(out); - } } @SuppressWarnings("deprecation") @@ -892,10 +868,9 @@ public class ResultMergeLocalFile extends ResultMerge long clen = mc.getCols(); int brlen = mc.getRowsPerBlock(); int bclen = mc.getColsPerBlock(); - MatrixIndexes indexes = new MatrixIndexes(1,1); - MatrixCell cell = new MatrixCell(0); + MatrixCell cell = new MatrixCell(0); SequenceFile.Writer out = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms try @@ -922,11 +897,8 @@ public class ResultMergeLocalFile extends ResultMerge throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found."); mb = StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], brlen, bclen ); boolean appendOnly = mb.isInSparseFormat(); - double[][] compare = DataConverter.convertToDoubleMatrix(mb); - - String[] lnames = dir.list(); - for( String lname : lnames ) - { + DenseBlock compare = DataConverter.convertToDenseBlock(mb, false); + for( String lname : dir.list() ) { MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen ); mergeWithComp(mb, tmp, compare); } @@ -941,21 +913,17 @@ public class ResultMergeLocalFile extends ResultMerge else //WITHOUT COMPARE BLOCK { //copy all non-zeros from all workers - String[] lnames = dir.list(); boolean appendOnly = false; - for( String lname : lnames ) - { - if( mb == null ) - { + for( String lname : dir.list() ) { + if( mb == null ) { mb = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen ); appendOnly = mb.isInSparseFormat(); } - else - { + else { MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen ); mergeWithoutComp(mb, tmp, appendOnly); } - } + } //sort sparse due to append-only if( appendOnly ) @@ -972,8 +940,7 @@ public class ResultMergeLocalFile extends ResultMerge if( mb.isInSparseFormat() ) { Iterator<IJV> iter = mb.getSparseBlockIterator(); - while( iter.hasNext() ) - { + while( iter.hasNext() ) { IJV lcell = iter.next(); indexes.setIndexes(row_offset+lcell.getI(), col_offset+lcell.getJ()); cell.setValue(lcell.getV()); @@ -996,7 +963,7 @@ public class ResultMergeLocalFile extends ResultMerge } } } - } + } } if( !written ) @@ -1031,5 +998,4 @@ public class ResultMergeLocalFile extends ResultMerge fs.rename(tmpPath, new Path(fnameNew+"/"+lname+seq.getNextID())); } } - } http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java index a31294e..991baca 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java @@ -27,6 +27,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataFormat; +import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; @@ -40,13 +41,11 @@ import org.apache.sysml.runtime.util.DataConverter; * */ public class ResultMergeLocalMemory extends ResultMerge -{ - +{ //internal comparison matrix - private double[][] _compare = null; + private DenseBlock _compare = null; - public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, String outputFilename ) - { + public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, String outputFilename ) { super( out, in, outputFilename ); } @@ -73,7 +72,7 @@ public class ResultMergeLocalMemory extends ResultMerge boolean appendOnly = outMBNew.isInSparseFormat(); //create compare matrix if required (existing data in result) - _compare = createCompareMatrix(outMB); + _compare = getCompareMatrix(outMB); if( _compare != null ) outMBNew.copy(outMB); @@ -88,7 +87,7 @@ public class ResultMergeLocalMemory extends ResultMerge LOG.trace("ResultMerge (local, in-memory): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")"); //read/pin input_i - MatrixBlock inMB = in.acquireRead(); + MatrixBlock inMB = in.acquireRead(); //core merge merge( outMBNew, inMB, appendOnly ); @@ -117,15 +116,13 @@ public class ResultMergeLocalMemory extends ResultMerge outMBNew.examSparsity(); //create output - if( flagMerged ) - { + if( flagMerged ) { //create new output matrix //(e.g., to prevent potential export<->read file access conflict in specific cases of // local-remote nested parfor)) - moNew = createNewMatrixObject( outMBNew ); + moNew = createNewMatrixObject( outMBNew ); } - else - { + else { moNew = _output; //return old matrix, to prevent copy } @@ -172,7 +169,7 @@ public class ResultMergeLocalMemory extends ResultMerge outMBNew.allocateDenseBlockUnsafe((int)rows, (int)cols); //create compare matrix if required (existing data in result) - _compare = createCompareMatrix(outMB); + _compare = getCompareMatrix(outMB); if( _compare != null ) outMBNew.copy(outMB); @@ -201,10 +198,9 @@ public class ResultMergeLocalMemory extends ResultMerge //create new output matrix //(e.g., to prevent potential export<->read file access conflict in specific cases of // local-remote nested parfor)) - moNew = createNewMatrixObject( outMBNew ); + moNew = createNewMatrixObject( outMBNew ); } - else - { + else { moNew = _output; //return old matrix, to prevent copy } @@ -220,10 +216,10 @@ public class ResultMergeLocalMemory extends ResultMerge return moNew; } - private static double[][] createCompareMatrix( MatrixBlock output ) { + private static DenseBlock getCompareMatrix( MatrixBlock output ) { //create compare matrix only if required - if( output.getNonZeros() > 0 ) - return DataConverter.convertToDoubleMatrix( output ); + if( !output.isEmptyBlock(false) ) + return DataConverter.convertToDenseBlock(output, false); return null; } http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java index 7d59230..e12a052 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java @@ -34,6 +34,8 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; @@ -52,13 +54,11 @@ import org.apache.sysml.runtime.util.DataConverter; */ public class ResultMergeRemoteReducer implements Reducer<Writable, Writable, Writable, Writable> -{ - +{ private ResultMergeReducer _reducer = null; - public ResultMergeRemoteReducer( ) - { - + public ResultMergeRemoteReducer( ) { + //do nothing } @Override @@ -83,20 +83,19 @@ public class ResultMergeRemoteReducer else if( ii == InputInfo.BinaryCellInputInfo ) _reducer = new ResultMergeReducerBinaryCell(requiresCompare); else if( ii == InputInfo.BinaryBlockInputInfo ) - _reducer = new ResultMergeReducerBinaryBlock(requiresCompare); + _reducer = new ResultMergeReducerBinaryBlock(requiresCompare, job); else throw new RuntimeException("Unable to configure mapper with unknown input info: "+ii.toString()); } @Override - public void close() throws IOException - { + public void close() throws IOException { //do nothing } private interface ResultMergeReducer //interface in order to allow ResultMergeReducerBinaryBlock to inherit from ResultMerge - { + { void processKeyValueList( Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter ) throws IOException; } @@ -107,8 +106,7 @@ public class ResultMergeRemoteReducer private StringBuilder _sb = null; private Text _objValue = null; - public ResultMergeReducerTextCell(boolean requiresCompare) - { + public ResultMergeReducerTextCell(boolean requiresCompare) { _requiresCompare = requiresCompare; _sb = new StringBuilder(); _objValue = new Text(); @@ -152,7 +150,7 @@ public class ResultMergeRemoteReducer _sb.append(lvalue); _objValue.set( _sb.toString() ); _sb.setLength(0); - out.collect(NullWritable.get(), _objValue ); + out.collect(NullWritable.get(), _objValue ); found = true; break; //only one write per cell possible (independence) }// note: objs with equal value are directly discarded @@ -170,8 +168,8 @@ public class ResultMergeRemoteReducer _sb.append(' '); _sb.append(c.doubleValue()); _objValue.set( _sb.toString() ); - _sb.setLength(0); - out.collect(NullWritable.get(), _objValue ); + _sb.setLength(0); + out.collect(NullWritable.get(), _objValue ); break; //only one write per cell possible (independence) } } @@ -179,7 +177,7 @@ public class ResultMergeRemoteReducer else { MatrixIndexes key2 = (MatrixIndexes) key; - while( valueList.hasNext() ) + while( valueList.hasNext() ) { TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next(); MatrixCell value = (MatrixCell) tVal.getBaseObject(); @@ -190,13 +188,11 @@ public class ResultMergeRemoteReducer _sb.append(' '); _sb.append(value.getValue()); _objValue.set( _sb.toString() ); - _sb.setLength(0); - out.collect(NullWritable.get(), _objValue ); + _sb.setLength(0); + out.collect(NullWritable.get(), _objValue ); break; //only one write per cell possible (independence) } } - - } } @@ -205,8 +201,7 @@ public class ResultMergeRemoteReducer private boolean _requiresCompare; private MatrixCell _objValue; - public ResultMergeReducerBinaryCell(boolean requiresCompare) - { + public ResultMergeReducerBinaryCell(boolean requiresCompare) { _requiresCompare = requiresCompare; _objValue = new MatrixCell(); } @@ -241,7 +236,7 @@ public class ResultMergeRemoteReducer cellList.add( cVal.getValue() ); else if( cellCompare.doubleValue() != cVal.getValue() ) //compare on the fly { - out.collect(key, cVal ); + out.collect(key, cVal ); found = true; break; //only one write per cell possible (independence) }// note: objs with equal value are directly discarded @@ -250,21 +245,19 @@ public class ResultMergeRemoteReducer //result merge for objs before compare if( !found ) - for( Double c : cellList ) - if( !c.equals( cellCompare) ) - { + for( Double c : cellList ) + if( !c.equals( cellCompare) ) { _objValue.setValue(c.doubleValue()); - out.collect(key, _objValue ); + out.collect(key, _objValue ); break; //only one write per cell possible (independence) } } //without compare else { - while( valueList.hasNext() ) - { - TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next(); - out.collect((MatrixIndexes)key, (MatrixCell)tVal.getBaseObject()); + while( valueList.hasNext() ) { + TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next(); + out.collect((MatrixIndexes)key, (MatrixCell)tVal.getBaseObject()); break; //only one write per cell possible (independence) } } @@ -274,23 +267,20 @@ public class ResultMergeRemoteReducer private static class ResultMergeReducerBinaryBlock extends ResultMerge implements ResultMergeReducer { private boolean _requiresCompare; + private JobConf _job = null; - public ResultMergeReducerBinaryBlock(boolean requiresCompare) - { + public ResultMergeReducerBinaryBlock(boolean requiresCompare, JobConf job) { _requiresCompare = requiresCompare; + _job = job; } @Override - public MatrixObject executeParallelMerge(int par) - throws DMLRuntimeException - { + public MatrixObject executeParallelMerge(int par) throws DMLRuntimeException { throw new DMLRuntimeException("Unsupported operation."); } @Override - public MatrixObject executeSerialMerge() - throws DMLRuntimeException - { + public MatrixObject executeSerialMerge() throws DMLRuntimeException { throw new DMLRuntimeException("Unsupported operation."); } @@ -302,37 +292,33 @@ public class ResultMergeRemoteReducer { MatrixIndexes ixOut = ((ResultMergeTaggedMatrixIndexes)key).getIndexes(); MatrixBlock mbOut = null; - double[][] aCompare = null; + DenseBlock aCompare = null; boolean appendOnly = false; //get and prepare compare block if required - if( _requiresCompare ) - { + if( _requiresCompare ) { TaggedMatrixBlock tVal = (TaggedMatrixBlock) valueList.next(); MatrixBlock bVal = (MatrixBlock) tVal.getBaseObject(); if( tVal.getTag()!=ResultMergeRemoteMR.COMPARE_TAG ) throw new IOException("Failed to read compare block at expected first position."); - aCompare = DataConverter.convertToDoubleMatrix(bVal); + aCompare = DataConverter.convertToDenseBlock(bVal, + InfrastructureAnalyzer.isLocalMode(_job)); } //merge all result blocks into final result block - while( valueList.hasNext() ) - { + while( valueList.hasNext() ) { TaggedMatrixBlock tVal = (TaggedMatrixBlock) valueList.next(); MatrixBlock bVal = (MatrixBlock) tVal.getBaseObject(); - - if( mbOut == null ) //copy first block - { + if( mbOut == null ) { //copy first block mbOut = new MatrixBlock(); mbOut.copy( bVal ); appendOnly = mbOut.isInSparseFormat(); } - else //merge remaining blocks - { + else { //merge remaining blocks if( _requiresCompare ) mergeWithComp(mbOut, bVal, aCompare); else - mergeWithoutComp(mbOut, bVal, appendOnly); + mergeWithoutComp(mbOut, bVal, appendOnly); } } @@ -348,8 +334,7 @@ public class ResultMergeRemoteReducer catch( Exception ex ) { throw new IOException(ex); - } + } } - } } http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java index 007f1ac..27cc894 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java @@ -27,13 +27,13 @@ import scala.Tuple2; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.util.DataConverter; public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,MatrixBlock>>, MatrixIndexes, MatrixBlock> { - private static final long serialVersionUID = -5970805069405942836L; @Override @@ -45,7 +45,7 @@ public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairF MatrixBlock cin = arg._2()._2(); //create compare array - double[][] compare = DataConverter.convertToDoubleMatrix(cin); + DenseBlock compare = DataConverter.convertToDenseBlock(cin, false); //merge all blocks into compare block MatrixBlock out = new MatrixBlock(cin); @@ -57,16 +57,12 @@ public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairF } @Override - public MatrixObject executeSerialMerge() - throws DMLRuntimeException - { + public MatrixObject executeSerialMerge() throws DMLRuntimeException { throw new DMLRuntimeException("Unsupported operation."); } @Override - public MatrixObject executeParallelMerge(int par) - throws DMLRuntimeException - { + public MatrixObject executeParallelMerge(int par) throws DMLRuntimeException { throw new DMLRuntimeException("Unsupported operation."); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java index 5ac1a79..962a327 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java @@ -38,35 +38,27 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; */ public class ResultMergeTaggedMatrixIndexes implements WritableComparable<ResultMergeTaggedMatrixIndexes> { - private MatrixIndexes _ix; private byte _tag = -1; - public ResultMergeTaggedMatrixIndexes() - { + public ResultMergeTaggedMatrixIndexes() { _ix = new MatrixIndexes(); } - public MatrixIndexes getIndexes() - { + public MatrixIndexes getIndexes() { return _ix; } - - public byte getTag() - { + public byte getTag() { return _tag; } - public void setTag(byte tag) - { + public void setTag(byte tag) { _tag = tag; } @Override - public void readFields(DataInput in) - throws IOException - { + public void readFields(DataInput in) throws IOException { if( _ix == null ) _ix = new MatrixIndexes(); _ix.readFields(in); @@ -74,33 +66,24 @@ public class ResultMergeTaggedMatrixIndexes implements WritableComparable<Result } @Override - public void write(DataOutput out) - throws IOException - { + public void write(DataOutput out) throws IOException { _ix.write(out); out.writeByte(_tag); } @Override - public int compareTo(ResultMergeTaggedMatrixIndexes that) - { + public int compareTo(ResultMergeTaggedMatrixIndexes that) { int ret = _ix.compareTo(that._ix); - if( ret == 0 ) - { ret = ((_tag == that._tag) ? 0 : - (_tag < that._tag)? -1 : 1); - } - + (_tag < that._tag)? -1 : 1); return ret; } @Override - public boolean equals(Object other) - { + public boolean equals(Object other) { if( !(other instanceof ResultMergeTaggedMatrixIndexes) ) return false; - ResultMergeTaggedMatrixIndexes that = (ResultMergeTaggedMatrixIndexes)other; return (_ix.equals(that._ix) && _tag == that._tag); } http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/util/DataConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java index 5c3ad25..284e783 100644 --- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java +++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java @@ -339,7 +339,7 @@ public class DataConverter Iterator<IJV> iter = mb.getSparseBlockIterator(); while( iter.hasNext() ) { IJV cell = iter.next(); - ret.set(cell.getI(), cols+cell.getJ(), cell.getV()); + ret.set(cell.getI(), cell.getJ(), cell.getV()); } } else if( deep ) {
