[SYSTEMML-2046] Large dense blocks in parfor result merge, cleanups

This patch adds support for large dense blocks in parfor result merge.
As a side benefit, this also improves the memory efficiency by avoiding
two-dimensional dense compare blocks (with overhead per row), and by
using shallow copies for existing dense block representations.

In addition, this also includes a fix for converting arbitrary matrix
blocks to dense blocks, which produced incorrect results for
sparse-dense conversions.

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/53014ddd
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/53014ddd
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/53014ddd

Branch: refs/heads/master
Commit: 53014dddccd485dcc25a990e19b2a7af051fed24
Parents: 7cb9566
Author: Matthias Boehm <[email protected]>
Authored: Fri Jan 5 18:55:56 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jan 5 18:55:56 2018 -0800

----------------------------------------------------------------------
 .../controlprogram/parfor/ResultMerge.java      |  84 +++++++---------
 .../parfor/ResultMergeLocalAutomatic.java       |   6 +-
 .../parfor/ResultMergeLocalFile.java            | 100 ++++++-------------
 .../parfor/ResultMergeLocalMemory.java          |  34 +++----
 .../parfor/ResultMergeRemoteReducer.java        |  89 +++++++----------
 .../parfor/ResultMergeRemoteSparkWCompare.java  |  12 +--
 .../parfor/ResultMergeTaggedMatrixIndexes.java  |  35 ++-----
 .../sysml/runtime/util/DataConverter.java       |   2 +-
 8 files changed, 139 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
index 86c60dd..333b679 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
@@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 /**
@@ -37,7 +37,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
  */
 public abstract class ResultMerge 
 {
-       
        protected static final Log LOG = 
LogFactory.getLog(ResultMerge.class.getName());
        
        protected static final String NAME_SUFFIX = "_rm";
@@ -47,13 +46,11 @@ public abstract class ResultMerge
        protected MatrixObject[] _inputs      = null; 
        protected String         _outputFName = null;
        
-       protected ResultMerge( )
-       {
-               
+       protected ResultMerge( ) {
+               //do nothing
        }
        
-       public ResultMerge( MatrixObject out, MatrixObject[] in, String 
outputFilename )
-       {
+       public ResultMerge( MatrixObject out, MatrixObject[] in, String 
outputFilename ) {
                _output = out;
                _inputs = in;
                _outputFName = outputFilename;
@@ -90,11 +87,9 @@ public abstract class ResultMerge
         * @param appendOnly ?
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, 
boolean appendOnly ) 
-               throws DMLRuntimeException
-       {
+       protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, 
boolean appendOnly ) throws DMLRuntimeException {
                //pass through to matrix block operations
-               out.merge(in, appendOnly);      
+               out.merge(in, appendOnly);
        }
 
        /**
@@ -106,7 +101,7 @@ public abstract class ResultMerge
         * @param compare ?
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       protected void mergeWithComp( MatrixBlock out, MatrixBlock in, 
double[][] compare ) 
+       protected void mergeWithComp( MatrixBlock out, MatrixBlock in, 
DenseBlock compare ) 
                throws DMLRuntimeException
        {
                //Notes for result correctness:
@@ -115,51 +110,48 @@ public abstract class ResultMerge
                // * Explicit NaN awareness because for cases were original 
matrix contains
                //   NaNs, since NaN != NaN, otherwise we would potentially 
overwrite results
                
-               if( in.isInSparseFormat() ) //sparse input format
-               {
+               if( in.isEmptyBlock(false) ) {
+                       for( int i=0; i<in.getNumRows(); i++ )
+                               for( int j=0; j<in.getNumColumns(); j++ )
+                                       if( compare.get(i, j) != 0 )
+                                               out.quickSetValue(i, j, 0);
+               }
+               else if( in.isInSparseFormat() ) { //SPARSE
                        int rows = in.getNumRows();
                        int cols = in.getNumColumns();
                        for( int i=0; i<rows; i++ )
-                               for( int j=0; j<cols; j++ )
-                               {       
-                                   double value = 
in.getValueSparseUnsafe(i,j);  //input value
-                                       if(   (value != compare[i][j] && 
!Double.isNaN(value) )     //for new values only (div)
-                                               || Double.isNaN(value) != 
Double.isNaN(compare[i][j]) ) //NaN awareness 
+                               for( int j=0; j<cols; j++ ) {
+                                       double value = 
in.getValueSparseUnsafe(i,j);  //input value
+                                       if(   (value != compare.get(i,j) && 
!Double.isNaN(value) )     //for new values only (div)
+                                               || Double.isNaN(value) != 
Double.isNaN(compare.get(i,j)) ) //NaN awareness 
                                        {
-                                       out.quickSetValue( i, j, value );       
+                                               out.quickSetValue(i, j, value);
                                        }
                                }
                }
-               else //dense input format
-               {
-                       //for a merge this case will seldom happen, as each 
input MatrixObject
-                       //has at most 1/numThreads of all values in it.
+               else { //DENSE
+                       //guaranteed allocated due to empty case above
+                       DenseBlock a = in.getDenseBlock();
                        int rows = in.getNumRows();
                        int cols = in.getNumColumns();
-                       for( int i=0; i<rows; i++ )
-                               for( int j=0; j<cols; j++ )
-                               {
-                                   double value = in.getValueDenseUnsafe(i,j); 
 //input value
-                                   if(    (value != compare[i][j] && 
!Double.isNaN(value) )    //for new values only (div)
-                                       || Double.isNaN(value) != 
Double.isNaN(compare[i][j]) ) //NaN awareness
-                                   {
-                                       out.quickSetValue( i, j, value );       
-                                   }
+                       for( int i=0; i<rows; i++ ) {
+                               double[] avals = a.values(i);
+                               int aix = a.pos(i);
+                               for( int j=0; j<cols; j++ ) {
+                                       double value = avals[aix+j]; //input 
value
+                                       if( (value != compare.get(i,j) && 
!Double.isNaN(value) ) //for new values only (div)
+                                               || Double.isNaN(value) != 
Double.isNaN(compare.get(i,j)) ) //NaN awareness
+                                       {
+                                               out.quickSetValue( i, j, value 
);
+                                       }
                                }
-               }       
+                       }
+               }
        }
 
-       protected long computeNonZeros( MatrixObject out, List<MatrixObject> in 
)
-       {
-               MatrixCharacteristics mc = out.getMatrixCharacteristics();
-               long outNNZ = mc.getNonZeros(); 
-               long ret = outNNZ;
-               for( MatrixObject tmp : in ) {
-                       MatrixCharacteristics tmpmc = 
tmp.getMatrixCharacteristics();
-                       long inNNZ = tmpmc.getNonZeros();
-                       ret +=  (inNNZ - outNNZ);
-               }
-               
-               return ret;
+       protected long computeNonZeros( MatrixObject out, List<MatrixObject> in 
) {
+               //sum of nnz of input (worker result) - output var existing nnz
+               return -(in.size() * 
out.getMatrixCharacteristics().getNonZeros())
+                       + in.stream().mapToLong(m -> 
m.getMatrixCharacteristics().getNonZeros()).sum();
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
index fe190ed..fec05c7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
@@ -29,11 +29,9 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 
 public class ResultMergeLocalAutomatic extends ResultMerge
 {
-       
        private ResultMerge _rm = null;
        
-       public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, 
String outputFilename )
-       {
+       public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, 
String outputFilename ) {
                super( out, in, outputFilename );
        }
 
@@ -72,6 +70,6 @@ public class ResultMergeLocalAutomatic extends ResultMerge
                else
                        _rm = new ResultMergeLocalFile( _output, _inputs, 
_outputFName );
                
-               return _rm.executeParallelMerge(par);   
+               return _rm.executeParallelMerge(par);
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index f245cbf..af77783 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -50,6 +50,7 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MetaDataFormat;
+import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -70,9 +71,8 @@ import org.apache.sysml.runtime.util.MapReduceTool;
  */
 public class ResultMergeLocalFile extends ResultMerge
 {
-       
        //NOTE: if we allow simple copies, this might result in a scattered 
file and many MR tasks for subsequent jobs
-       public static final boolean ALLOW_COPY_CELLFILES = false;       
+       public static final boolean ALLOW_COPY_CELLFILES = false;
        
        //internal comparison matrix
        private IDSequence _seq = null;
@@ -681,11 +681,8 @@ public class ResultMergeLocalFile extends ResultMerge
                                                                throw new 
DMLRuntimeException("Unable to merge results because multiple compare blocks 
found.");
                                                        mb = 
LocalFileUtils.readMatrixBlockFromLocal( dir2+"/"+lnames2[0] );
                                                        boolean appendOnly = 
mb.isInSparseFormat();
-                                                       double[][] compare = 
DataConverter.convertToDoubleMatrix(mb);
-                                                       
-                                                       String[] lnames = 
dir.list();
-                                                       for( String lname : 
lnames )
-                                                       {
+                                                       DenseBlock compare = 
DataConverter.convertToDenseBlock(mb, false);
+                                                       for( String lname : 
dir.list() ) {
                                                                MatrixBlock tmp 
= LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
                                                                
mergeWithComp(mb, tmp, compare);
                                                        }
@@ -700,21 +697,17 @@ public class ResultMergeLocalFile extends ResultMerge
                                                else //WITHOUT COMPARE BLOCK
                                                {
                                                        //copy all non-zeros 
from all workers
-                                                       String[] lnames = 
dir.list();
                                                        boolean appendOnly = 
false;
-                                                       for( String lname : 
lnames )
-                                                       {
-                                                               if( mb == null )
-                                                               {
+                                                       for( String lname : 
dir.list() ) {
+                                                               if( mb == null 
) {
                                                                        mb = 
LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
                                                                        
appendOnly = mb.isInSparseFormat();
                                                                }
-                                                               else
-                                                               {
+                                                               else {
                                                                        
MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
                                                                        
mergeWithoutComp(mb, tmp, appendOnly);
                                                                }
-                                                       }       
+                                                       }
                                                        
                                                        //sort sparse due to 
append-only
                                                        if( appendOnly )
@@ -724,19 +717,17 @@ public class ResultMergeLocalFile extends ResultMerge
                                                        mb.examSparsity(); 
                                                }
                                        }
-                                       else
-                                       {
+                                       else {
                                                //NOTE: whenever runtime does 
not need all blocks anymore, this can be removed
                                                int maxRow = 
(int)(((brow-1)*brlen + brlen < rlen) ? brlen : rlen - (brow-1)*brlen);
                                                int maxCol = 
(int)(((bcol-1)*bclen + bclen < clen) ? bclen : clen - (bcol-1)*bclen);
-                               
                                                mb = new MatrixBlock(maxRow, 
maxCol, true);
-                                       }       
+                                       }
                                        
                                        //mb.examSparsity(); //done on write 
anyway and mb not reused
                                        indexes.setIndexes(brow, bcol);
                                        writer.append(indexes, mb);
-                               }       
+                               }
                }
                finally {
                        IOUtilFunctions.closeSilently(writer);
@@ -755,10 +746,8 @@ public class ResultMergeLocalFile extends ResultMerge
                long clen = mc.getCols();
                int brlen = mc.getRowsPerBlock();
                int bclen = mc.getColsPerBlock();
-                               
-               BufferedWriter out = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));          
-               try
-               {
+               
+               try( BufferedWriter out = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true))) ) {
                        //for obj reuse and preventing repeated buffer 
re-allocations
                        StringBuilder sb = new StringBuilder();
                        
@@ -784,11 +773,8 @@ public class ResultMergeLocalFile extends ResultMerge
                                                                throw new 
DMLRuntimeException("Unable to merge results because multiple compare blocks 
found.");
                                                        mb = 
StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], brlen, bclen 
);
                                                        boolean appendOnly = 
mb.isInSparseFormat();
-                                                       double[][] compare = 
DataConverter.convertToDoubleMatrix(mb);
-                                                       
-                                                       String[] lnames = 
dir.list();
-                                                       for( String lname : 
lnames )
-                                                       {
+                                                       DenseBlock compare = 
DataConverter.convertToDenseBlock(mb, false);
+                                                       for( String lname : 
dir.list() ) {
                                                                MatrixBlock tmp 
= StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, brlen, bclen );
                                                                
mergeWithComp(mb, tmp, compare);
                                                        }
@@ -803,17 +789,13 @@ public class ResultMergeLocalFile extends ResultMerge
                                                else //WITHOUT COMPARE BLOCK
                                                {
                                                        //copy all non-zeros 
from all workers
-                                                       String[] lnames = 
dir.list();
                                                        boolean appendOnly = 
false;
-                                                       for( String lname : 
lnames )
-                                                       {
-                                                               if( mb == null )
-                                                               {
+                                                       for( String lname : 
dir.list() ) {
+                                                               if( mb == null 
) {
                                                                        mb = 
StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
                                                                        
appendOnly = mb.isInSparseFormat();
                                                                }
-                                                               else
-                                                               {
+                                                               else {
                                                                        
MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, 
brlen, bclen );
                                                                        
mergeWithoutComp(mb, tmp, appendOnly);
                                                                }
@@ -831,11 +813,9 @@ public class ResultMergeLocalFile extends ResultMerge
                                        //write the block to text cell
                                        if( mb!=null )
                                        {
-                                               if( mb.isInSparseFormat() )
-                                               {
+                                               if( mb.isInSparseFormat() ) {
                                                        Iterator<IJV> iter = 
mb.getSparseBlockIterator();
-                                                       while( iter.hasNext() )
-                                                       {
+                                                       while( iter.hasNext() ) 
{
                                                                IJV lcell = 
iter.next();
                                                                
sb.append(row_offset+lcell.getI());
                                                                sb.append(' ');
@@ -843,13 +823,12 @@ public class ResultMergeLocalFile extends ResultMerge
                                                                sb.append(' ');
                                                                
sb.append(lcell.getV());
                                                                sb.append('\n');
-                                                               out.write( 
sb.toString() ); 
+                                                               out.write( 
sb.toString() );
                                                                sb.setLength(0);
                                                                written = true;
-                                                       }                       
                                
+                                                       }
                                                }
-                                               else
-                                               {
+                                               else {
                                                        for( int i=0; i<brlen; 
i++ )
                                                                for( int j=0; 
j<bclen; j++ )
                                                                {
@@ -868,15 +847,12 @@ public class ResultMergeLocalFile extends ResultMerge
                                                                        }
                                                                }
                                                }
-                                       }                               
+                                       }
                                }       
                        
                        if( !written )
                                out.write("1 1 0\n");
                }
-               finally {
-                       IOUtilFunctions.closeSilently(out);
-               }
        }
 
        @SuppressWarnings("deprecation")
@@ -892,10 +868,9 @@ public class ResultMergeLocalFile extends ResultMerge
                long clen = mc.getCols();
                int brlen = mc.getRowsPerBlock();
                int bclen = mc.getColsPerBlock();
-                               
                
                MatrixIndexes indexes = new MatrixIndexes(1,1);
-               MatrixCell cell = new MatrixCell(0);    
+               MatrixCell cell = new MatrixCell(0);
                
                SequenceFile.Writer out = new SequenceFile.Writer(fs, job, 
path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
                try
@@ -922,11 +897,8 @@ public class ResultMergeLocalFile extends ResultMerge
                                                                throw new 
DMLRuntimeException("Unable to merge results because multiple compare blocks 
found.");
                                                        mb = 
StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], brlen, bclen 
);
                                                        boolean appendOnly = 
mb.isInSparseFormat();
-                                                       double[][] compare = 
DataConverter.convertToDoubleMatrix(mb);
-                                                       
-                                                       String[] lnames = 
dir.list();
-                                                       for( String lname : 
lnames )
-                                                       {
+                                                       DenseBlock compare = 
DataConverter.convertToDenseBlock(mb, false);
+                                                       for( String lname : 
dir.list() ) {
                                                                MatrixBlock tmp 
= StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, brlen, bclen );
                                                                
mergeWithComp(mb, tmp, compare);
                                                        }
@@ -941,21 +913,17 @@ public class ResultMergeLocalFile extends ResultMerge
                                                else //WITHOUT COMPARE BLOCK
                                                {
                                                        //copy all non-zeros 
from all workers
-                                                       String[] lnames = 
dir.list();
                                                        boolean appendOnly = 
false;
-                                                       for( String lname : 
lnames )
-                                                       {
-                                                               if( mb == null )
-                                                               {
+                                                       for( String lname : 
dir.list() ) {
+                                                               if( mb == null 
) {
                                                                        mb = 
StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
                                                                        
appendOnly = mb.isInSparseFormat();
                                                                }
-                                                               else
-                                                               {
+                                                               else {
                                                                        
MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, 
brlen, bclen );
                                                                        
mergeWithoutComp(mb, tmp, appendOnly);
                                                                }
-                                                       }       
+                                                       }
                                                        
                                                        //sort sparse due to 
append-only
                                                        if( appendOnly )
@@ -972,8 +940,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                                if( mb.isInSparseFormat() )
                                                {
                                                        Iterator<IJV> iter = 
mb.getSparseBlockIterator();
-                                                       while( iter.hasNext() )
-                                                       {
+                                                       while( iter.hasNext() ) 
{
                                                                IJV lcell = 
iter.next();
                                                                
indexes.setIndexes(row_offset+lcell.getI(), col_offset+lcell.getJ());
                                                                
cell.setValue(lcell.getV());
@@ -996,7 +963,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                                                        }
                                                                }
                                                }
-                                       }                               
+                                       }
                                }       
                        
                        if( !written )
@@ -1031,5 +998,4 @@ public class ResultMergeLocalFile extends ResultMerge
                        fs.rename(tmpPath, new 
Path(fnameNew+"/"+lname+seq.getNextID()));
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
index a31294e..991baca 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
@@ -27,6 +27,7 @@ import 
org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MetaDataFormat;
+import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -40,13 +41,11 @@ import org.apache.sysml.runtime.util.DataConverter;
  * 
  */
 public class ResultMergeLocalMemory extends ResultMerge
-{      
-       
+{
        //internal comparison matrix
-       private double[][]        _compare     = null;
+       private DenseBlock _compare = null;
        
-       public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, 
String outputFilename )
-       {
+       public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, 
String outputFilename ) {
                super( out, in, outputFilename );
        }
        
@@ -73,7 +72,7 @@ public class ResultMergeLocalMemory extends ResultMerge
                        boolean appendOnly = outMBNew.isInSparseFormat();
                        
                        //create compare matrix if required (existing data in 
result)
-                       _compare = createCompareMatrix(outMB);
+                       _compare = getCompareMatrix(outMB);
                        if( _compare != null )
                                outMBNew.copy(outMB);
                        
@@ -88,7 +87,7 @@ public class ResultMergeLocalMemory extends ResultMerge
                                                LOG.trace("ResultMerge (local, 
in-memory): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
                                        
                                        //read/pin input_i
-                                       MatrixBlock inMB = in.acquireRead();    
+                                       MatrixBlock inMB = in.acquireRead();
                                        
                                        //core merge 
                                        merge( outMBNew, inMB, appendOnly );
@@ -117,15 +116,13 @@ public class ResultMergeLocalMemory extends ResultMerge
                        outMBNew.examSparsity(); 
                        
                        //create output
-                       if( flagMerged )
-                       {               
+                       if( flagMerged ) {
                                //create new output matrix 
                                //(e.g., to prevent potential export<->read 
file access conflict in specific cases of 
                                // local-remote nested parfor))
-                               moNew = createNewMatrixObject( outMBNew );      
+                               moNew = createNewMatrixObject( outMBNew );
                        }
-                       else
-                       {
+                       else {
                                moNew = _output; //return old matrix, to 
prevent copy
                        }
                        
@@ -172,7 +169,7 @@ public class ResultMergeLocalMemory extends ResultMerge
                                outMBNew.allocateDenseBlockUnsafe((int)rows, 
(int)cols);
                                
                                //create compare matrix if required (existing 
data in result)
-                               _compare = createCompareMatrix(outMB);
+                               _compare = getCompareMatrix(outMB);
                                if( _compare != null )
                                        outMBNew.copy(outMB);
                                
@@ -201,10 +198,9 @@ public class ResultMergeLocalMemory extends ResultMerge
                                //create new output matrix 
                                //(e.g., to prevent potential export<->read 
file access conflict in specific cases of 
                                // local-remote nested parfor))
-                               moNew = createNewMatrixObject( outMBNew );      
+                               moNew = createNewMatrixObject( outMBNew );
                        }
-                       else
-                       {
+                       else {
                                moNew = _output; //return old matrix, to 
prevent copy
                        }
                        
@@ -220,10 +216,10 @@ public class ResultMergeLocalMemory extends ResultMerge
                return moNew;
        }
 
-       private static double[][] createCompareMatrix( MatrixBlock output ) {
+       private static DenseBlock getCompareMatrix( MatrixBlock output ) {
                //create compare matrix only if required
-               if( output.getNonZeros() > 0 )
-                       return DataConverter.convertToDoubleMatrix( output );
+               if( !output.isEmptyBlock(false) )
+                       return DataConverter.convertToDenseBlock(output, false);
                return null;
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
index 7d59230..e12a052 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.mapred.Reporter;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
@@ -52,13 +54,11 @@ import org.apache.sysml.runtime.util.DataConverter;
  */
 public class ResultMergeRemoteReducer 
        implements Reducer<Writable, Writable, Writable, Writable>
-{      
-       
+{
        private ResultMergeReducer _reducer = null;
        
-       public ResultMergeRemoteReducer( ) 
-       {
-               
+       public ResultMergeRemoteReducer( ) {
+               //do nothing
        }
        
        @Override
@@ -83,20 +83,19 @@ public class ResultMergeRemoteReducer
                else if( ii == InputInfo.BinaryCellInputInfo )
                        _reducer = new 
ResultMergeReducerBinaryCell(requiresCompare);
                else if( ii == InputInfo.BinaryBlockInputInfo )
-                       _reducer = new 
ResultMergeReducerBinaryBlock(requiresCompare);
+                       _reducer = new 
ResultMergeReducerBinaryBlock(requiresCompare, job);
                else
                        throw new RuntimeException("Unable to configure mapper 
with unknown input info: "+ii.toString());
        }
 
        @Override
-       public void close() throws IOException 
-       {
+       public void close() throws IOException {
                //do nothing
        }
 
        
        private interface ResultMergeReducer //interface in order to allow 
ResultMergeReducerBinaryBlock to inherit from ResultMerge
-       {       
+       {
                void processKeyValueList( Writable key, Iterator<Writable> 
valueList, OutputCollector<Writable, Writable> out, Reporter reporter ) 
                        throws IOException;
        }
@@ -107,8 +106,7 @@ public class ResultMergeRemoteReducer
                private StringBuilder _sb = null;
                private Text _objValue = null;
                
-               public ResultMergeReducerTextCell(boolean requiresCompare)
-               {
+               public ResultMergeReducerTextCell(boolean requiresCompare) {
                        _requiresCompare = requiresCompare;
                        _sb = new StringBuilder();
                        _objValue = new Text();
@@ -152,7 +150,7 @@ public class ResultMergeRemoteReducer
                                                        _sb.append(lvalue);
                                                        _objValue.set( 
_sb.toString() );
                                                        _sb.setLength(0);
-                                                       
out.collect(NullWritable.get(), _objValue );    
+                                                       
out.collect(NullWritable.get(), _objValue );
                                                        found = true;
                                                        break; //only one write 
per cell possible (independence)
                                                }// note: objs with equal value 
are directly discarded
@@ -170,8 +168,8 @@ public class ResultMergeRemoteReducer
                                                        _sb.append(' ');
                                                        
_sb.append(c.doubleValue());
                                                        _objValue.set( 
_sb.toString() );
-                                                       _sb.setLength(0);       
                                                
-                                                       
out.collect(NullWritable.get(), _objValue );    
+                                                       _sb.setLength(0);
+                                                       
out.collect(NullWritable.get(), _objValue );
                                                        break; //only one write 
per cell possible (independence)
                                                }
                        }
@@ -179,7 +177,7 @@ public class ResultMergeRemoteReducer
                        else
                        {
                                MatrixIndexes key2 = (MatrixIndexes) key;
-                               while( valueList.hasNext() )  
+                               while( valueList.hasNext() )
                                {
                                        TaggedMatrixCell tVal = 
(TaggedMatrixCell) valueList.next(); 
                                        MatrixCell value = (MatrixCell) 
tVal.getBaseObject();
@@ -190,13 +188,11 @@ public class ResultMergeRemoteReducer
                                        _sb.append(' ');
                                        _sb.append(value.getValue());
                                        _objValue.set( _sb.toString() );
-                                       _sb.setLength(0);                       
        
-                                       out.collect(NullWritable.get(), 
_objValue );    
+                                       _sb.setLength(0);
+                                       out.collect(NullWritable.get(), 
_objValue );
                                        break; //only one write per cell 
possible (independence)
                                }
                        }
-                       
-                       
                }
        }
        
@@ -205,8 +201,7 @@ public class ResultMergeRemoteReducer
                private boolean _requiresCompare;
                private MatrixCell _objValue;
                
-               public ResultMergeReducerBinaryCell(boolean requiresCompare)
-               {
+               public ResultMergeReducerBinaryCell(boolean requiresCompare) {
                        _requiresCompare = requiresCompare;
                        _objValue = new MatrixCell();
                }
@@ -241,7 +236,7 @@ public class ResultMergeRemoteReducer
                                                        cellList.add( 
cVal.getValue() );
                                                else if( 
cellCompare.doubleValue() != cVal.getValue() ) //compare on the fly
                                                {
-                                                       out.collect(key, cVal 
);        
+                                                       out.collect(key, cVal );
                                                        found = true;
                                                        break; //only one write 
per cell possible (independence)
                                                }// note: objs with equal value 
are directly discarded
@@ -250,21 +245,19 @@ public class ResultMergeRemoteReducer
                                
                                //result merge for objs before compare
                                if( !found )
-                                       for( Double c : cellList )              
                
-                                               if( !c.equals( cellCompare) )
-                                               {                               
+                                       for( Double c : cellList )
+                                               if( !c.equals( cellCompare) ) {
                                                        
_objValue.setValue(c.doubleValue());
-                                                       out.collect(key, 
_objValue );   
+                                                       out.collect(key, 
_objValue );
                                                        break; //only one write 
per cell possible (independence)
                                                }
                        }
                        //without compare
                        else
                        {
-                               while( valueList.hasNext() )  
-                               {
-                                       TaggedMatrixCell tVal = 
(TaggedMatrixCell) valueList.next(); 
-                                       out.collect((MatrixIndexes)key, 
(MatrixCell)tVal.getBaseObject());      
+                               while( valueList.hasNext() ) {
+                                       TaggedMatrixCell tVal = 
(TaggedMatrixCell) valueList.next();
+                                       out.collect((MatrixIndexes)key, 
(MatrixCell)tVal.getBaseObject());
                                        break; //only one write per cell 
possible (independence)
                                }
                        }
@@ -274,23 +267,20 @@ public class ResultMergeRemoteReducer
        private static class ResultMergeReducerBinaryBlock extends ResultMerge 
implements ResultMergeReducer
        {
                private boolean _requiresCompare;
+               private JobConf _job = null;
                
-               public ResultMergeReducerBinaryBlock(boolean requiresCompare)
-               {
+               public ResultMergeReducerBinaryBlock(boolean requiresCompare, 
JobConf job) {
                        _requiresCompare = requiresCompare;
+                       _job = job;
                }
                
                @Override
-               public MatrixObject executeParallelMerge(int par) 
-                       throws DMLRuntimeException 
-               {
+               public MatrixObject executeParallelMerge(int par) throws 
DMLRuntimeException {
                        throw new DMLRuntimeException("Unsupported operation.");
                }
 
                @Override
-               public MatrixObject executeSerialMerge() 
-                       throws DMLRuntimeException 
-               {
+               public MatrixObject executeSerialMerge() throws 
DMLRuntimeException {
                        throw new DMLRuntimeException("Unsupported operation.");
                }
 
@@ -302,37 +292,33 @@ public class ResultMergeRemoteReducer
                        {
                                MatrixIndexes ixOut = 
((ResultMergeTaggedMatrixIndexes)key).getIndexes();
                                MatrixBlock mbOut = null;
-                               double[][] aCompare = null;
+                               DenseBlock aCompare = null;
                                boolean appendOnly = false;
                                
                                //get and prepare compare block if required
-                               if( _requiresCompare )
-                               {
+                               if( _requiresCompare ) {
                                        TaggedMatrixBlock tVal = 
(TaggedMatrixBlock) valueList.next();
                                        MatrixBlock bVal = (MatrixBlock) 
tVal.getBaseObject();
                                        if( 
tVal.getTag()!=ResultMergeRemoteMR.COMPARE_TAG )
                                                throw new IOException("Failed 
to read compare block at expected first position.");
-                                       aCompare = 
DataConverter.convertToDoubleMatrix(bVal);
+                                       aCompare = 
DataConverter.convertToDenseBlock(bVal,
+                                               
InfrastructureAnalyzer.isLocalMode(_job));
                                }
                                
                                //merge all result blocks into final result 
block 
-                               while( valueList.hasNext() ) 
-                               {
+                               while( valueList.hasNext() ) {
                                        TaggedMatrixBlock tVal = 
(TaggedMatrixBlock) valueList.next();
                                        MatrixBlock bVal = (MatrixBlock) 
tVal.getBaseObject();
-                                       
-                                       if( mbOut == null ) //copy first block
-                                       {
+                                       if( mbOut == null ) { //copy first block
                                                mbOut = new MatrixBlock();
                                                mbOut.copy( bVal );
                                                appendOnly = 
mbOut.isInSparseFormat();
                                        }
-                                       else //merge remaining blocks
-                                       {
+                                       else { //merge remaining blocks
                                                if( _requiresCompare )
                                                        mergeWithComp(mbOut, 
bVal, aCompare);
                                                else
-                                                       mergeWithoutComp(mbOut, 
bVal, appendOnly);      
+                                                       mergeWithoutComp(mbOut, 
bVal, appendOnly);
                                        }
                                }
                                
@@ -348,8 +334,7 @@ public class ResultMergeRemoteReducer
                        catch( Exception ex )
                        {
                                throw new IOException(ex);
-                       }                       
+                       }
                }
-               
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
index 007f1ac..27cc894 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
@@ -27,13 +27,13 @@ import scala.Tuple2;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.util.DataConverter;
 
 public class ResultMergeRemoteSparkWCompare extends ResultMerge implements 
PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,MatrixBlock>>, 
MatrixIndexes, MatrixBlock>
 {
-       
        private static final long serialVersionUID = -5970805069405942836L;
        
        @Override
@@ -45,7 +45,7 @@ public class ResultMergeRemoteSparkWCompare extends 
ResultMerge implements PairF
                MatrixBlock cin = arg._2()._2();
                
                //create compare array
-               double[][] compare = DataConverter.convertToDoubleMatrix(cin);
+               DenseBlock compare = DataConverter.convertToDenseBlock(cin, 
false);
                
                //merge all blocks into compare block
                MatrixBlock out = new MatrixBlock(cin);
@@ -57,16 +57,12 @@ public class ResultMergeRemoteSparkWCompare extends 
ResultMerge implements PairF
        }
 
        @Override
-       public MatrixObject executeSerialMerge() 
-                       throws DMLRuntimeException 
-       {
+       public MatrixObject executeSerialMerge() throws DMLRuntimeException {
                throw new DMLRuntimeException("Unsupported operation.");
        }
 
        @Override
-       public MatrixObject executeParallelMerge(int par)
-                       throws DMLRuntimeException 
-       {
+       public MatrixObject executeParallelMerge(int par) throws 
DMLRuntimeException {
                throw new DMLRuntimeException("Unsupported operation.");
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
index 5ac1a79..962a327 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
@@ -38,35 +38,27 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
  */
 public class ResultMergeTaggedMatrixIndexes implements 
WritableComparable<ResultMergeTaggedMatrixIndexes>
 {
-       
        private MatrixIndexes _ix;
        private byte _tag = -1;
        
-       public ResultMergeTaggedMatrixIndexes()
-       {
+       public ResultMergeTaggedMatrixIndexes() {
                _ix = new MatrixIndexes();
        }
 
-       public MatrixIndexes getIndexes()
-       {
+       public MatrixIndexes getIndexes() {
                return _ix;
        }
        
-       
-       public byte getTag()
-       {
+       public byte getTag() {
                return _tag;
        }
        
-       public void setTag(byte tag)
-       {
+       public void setTag(byte tag) {
                _tag = tag;
        }
 
        @Override
-       public void readFields(DataInput in) 
-               throws IOException 
-       {
+       public void readFields(DataInput in) throws IOException {
                if( _ix == null )
                        _ix = new MatrixIndexes();
                _ix.readFields(in);
@@ -74,33 +66,24 @@ public class ResultMergeTaggedMatrixIndexes implements 
WritableComparable<Result
        }
 
        @Override
-       public void write(DataOutput out) 
-               throws IOException 
-       {
+       public void write(DataOutput out) throws IOException {
                _ix.write(out);
                out.writeByte(_tag);
        }
 
        @Override
-       public int compareTo(ResultMergeTaggedMatrixIndexes that) 
-       {
+       public int compareTo(ResultMergeTaggedMatrixIndexes that) {
                int ret = _ix.compareTo(that._ix);
-               
                if( ret == 0 )
-               {
                        ret = ((_tag == that._tag) ? 0 : 
-                                  (_tag < that._tag)? -1 : 1);
-               }
-               
+                               (_tag < that._tag)? -1 : 1);
                return ret;
        }
        
        @Override
-       public boolean equals(Object other) 
-       {
+       public boolean equals(Object other) {
                if( !(other instanceof ResultMergeTaggedMatrixIndexes) )
                        return false;
-               
                ResultMergeTaggedMatrixIndexes that = 
(ResultMergeTaggedMatrixIndexes)other;
                return (_ix.equals(that._ix) && _tag == that._tag);
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/53014ddd/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java 
b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index 5c3ad25..284e783 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -339,7 +339,7 @@ public class DataConverter
                                Iterator<IJV> iter = 
mb.getSparseBlockIterator();
                                while( iter.hasNext() ) {
                                        IJV cell = iter.next();
-                                       ret.set(cell.getI(), cols+cell.getJ(), 
cell.getV());
+                                       ret.set(cell.getI(), cell.getJ(), 
cell.getV());
                                }
                        }
                        else if( deep ) {

Reply via email to