Repository: incubator-systemml
Updated Branches:
  refs/heads/master 54cad60c2 -> f5b78a322


[SYSTEMML-1517] Improved memory handling compressed matrix-vector mult

This patch improves the temporary memory handling of compressed
matrix-vector and vector-matrix multiplication. So far, we repeatedly
allocated double and int vectors in the number of values per column
group, which causes unnecessary garbage collection overhead on openjdk.
We now allocate these vectors once for all column groups with the
maximum number of values and each column group simply resets and uses a
subset of this temporary memory. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f5b78a32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f5b78a32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f5b78a32

Branch: refs/heads/master
Commit: f5b78a3222cedeafb582cbf728f30522dbcd0daf
Parents: 54cad60
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Wed Apr 12 20:42:34 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Wed Apr 12 20:45:57 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/compress/ColGroupDDC1.java    |  4 +-
 .../sysml/runtime/compress/ColGroupDDC2.java    |  2 +-
 .../sysml/runtime/compress/ColGroupOLE.java     |  6 +--
 .../sysml/runtime/compress/ColGroupRLE.java     |  6 +--
 .../sysml/runtime/compress/ColGroupValue.java   | 56 +++++++++++++++++++-
 .../runtime/compress/CompressedMatrixBlock.java | 39 +++++++++++---
 6 files changed, 94 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
index 4db871f..82adb55 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
@@ -223,7 +223,7 @@ public class ColGroupDDC1 extends ColGroupDDC
                                sb[j] = b[grps[i]._colIndexes[j]];
                        }       
                        //pre-aggregate all distinct values (guaranteed <=255)
-                       vals[i] = grps[i].preaggValues(grps[i].getNumValues(), 
sb);
+                       vals[i] = grps[i].preaggValues(grps[i].getNumValues(), 
sb, true);
                }
                
                //cache-conscious matrix-vector multiplication
@@ -247,7 +247,7 @@ public class ColGroupDDC1 extends ColGroupDDC
                {
                        //iterative over codes and pre-aggregate inputs per 
code (guaranteed <=255)
                        //temporary array also avoids false sharing in 
multi-threaded environments
-                       double[] vals = new double[numVals];
+                       double[] vals = allocDVector(numVals, true);
                        for( int i=0; i<nrow; i++ ) {
                                vals[_data[i]&0xFF] += a[i];
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
index 5f29979..66ba149 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
@@ -221,7 +221,7 @@ public class ColGroupDDC2 extends ColGroupDDC
                {
                        //iterative over codes and pre-aggregate inputs per code
                        //temporary array also avoids false sharing in 
multi-threaded environments
-                       double[] vals = new double[numVals];
+                       double[] vals = allocDVector(numVals, true);
                        for( int i=0; i<nrow; i++ ) {
                                vals[_data[i]] += a[i];
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
index f47a432..80e53d4 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
@@ -380,8 +380,8 @@ public class ColGroupOLE extends ColGroupOffset
                        //step 1: prepare position and value arrays
                        
                        //current pos per OLs / output values
-                       int[] apos = new int[numVals];
-                       double[] cvals = new double[numVals];
+                       int[] apos = allocIVector(numVals, true);
+                       double[] cvals = allocDVector(numVals, true);
                        
                        //step 2: cache conscious matrix-vector via horizontal 
scans 
                        for( int ai=0; ai<n; ai+=blksz2 ) 
@@ -687,7 +687,7 @@ public class ColGroupOLE extends ColGroupOffset
         * @return array of positions for all values
         */
        private int[] skipScan(int numVals, int rl) {
-               int[] ret = new int[numVals];
+               int[] ret = allocIVector(numVals, rl==0);
                final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
                
                if( rl > 0 ) { //rl aligned with blksz          

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
index 6d2ec43..63786d0 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -349,9 +349,9 @@ public class ColGroupRLE extends ColGroupOffset
                        //step 1: prepare position and value arrays
                        
                        //current pos per OLs / output values
-                       int[] apos = new int[numVals];
                        int[] astart = new int[numVals];
-                       double[] cvals = new double[numVals];
+                       int[] apos = allocIVector(numVals, true);
+                       double[] cvals = allocDVector(numVals, true);
                        
                        //step 2: cache conscious matrix-vector via horizontal 
scans 
                        for( int ai=0; ai<n; ai+=blksz ) 
@@ -677,7 +677,7 @@ public class ColGroupRLE extends ColGroupOffset
         * @return array of positions for all values
         */
        private int[] skipScan(int numVals, int rl, int[] astart) {
-               int[] apos = new int[numVals]; 
+               int[] apos = allocIVector(numVals, rl==0);
                
                if( rl > 0 ) { //rl aligned with blksz  
                        for (int k = 0; k < numVals; k++) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
index 928296d..d8870a4 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
@@ -28,6 +28,7 @@ import org.apache.sysml.runtime.functionobjects.KahanFunction;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 
@@ -46,7 +47,11 @@ public abstract class ColGroupValue extends ColGroup
        //slight performance decrease for parallel incl multi-threaded, hence 
not applied for
        //distributed operations (also because compression time + garbage 
collection increases)
        public static final boolean SORT_VALUES_BY_LENGTH = true; 
-               
+       
+       //thread-local pairs of reusable temporary vectors for positions and 
values
+       private static ThreadLocal<Pair<int[], double[]>> memPool = new 
ThreadLocal<Pair<int[], double[]>>() {
+               @Override protected Pair<int[], double[]> initialValue() { 
return new Pair<int[], double[]>(); }
+       };
        
        /** Distinct values associated with individual bitmaps. */
        protected double[] _values; //linearized <numcol vals> <numcol vals>
@@ -184,7 +189,12 @@ public abstract class ColGroupValue extends ColGroup
        }
 
        protected final double[] preaggValues(int numVals, double[] b) {
-               double[] ret = new double[numVals];
+               return preaggValues(numVals, b, false);
+       }
+       
+       protected final double[] preaggValues(int numVals, double[] b, boolean 
allocNew) {
+               double[] ret = allocNew ? new double[numVals] : 
+                       allocDVector(numVals, false);
                for( int k = 0; k < numVals; k++ )
                        ret[k] = sumValues(k, b);
                
@@ -300,4 +310,46 @@ public abstract class ColGroupValue extends ColGroup
         */
        public abstract void unaryAggregateOperations(AggregateUnaryOperator 
op, MatrixBlock result, int rl, int ru)
                throws DMLRuntimeException;
+       
+
+       //dynamic memory management
+       
+       public static void setupThreadLocalMemory(int len) {
+               Pair<int[], double[]> p = new Pair<int[], double[]>();
+               p.setKey(new int[len]);
+               p.setValue(new double[len]);
+               memPool.set(p);
+       }
+       
+       public static void cleanupThreadLocalMemory() {
+               memPool.remove();
+       }
+       
+       protected static double[] allocDVector(int len, boolean reset) {
+               Pair<int[], double[]> p = memPool.get();
+               
+               //sanity check for missing setup
+               if( p.getValue() == null )
+                       return new double[len];
+               
+               //get and reset if necessary
+               double[] tmp = p.getValue();
+               if( reset )
+                       Arrays.fill(tmp, 0, len, 0);
+               return tmp;
+       }
+       
+       protected static int[] allocIVector(int len, boolean reset) {
+               Pair<int[], double[]> p = memPool.get();
+               
+               //sanity check for missing setup
+               if( p.getKey() == null )
+                       return new int[len];
+               
+               //get and reset if necessary
+               int[] tmp = p.getKey();
+               if( reset )
+                       Arrays.fill(tmp, 0, len, 0);
+               return tmp;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 7faed54..ead0b19 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -1358,6 +1358,8 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        private static void rightMultByVector(ArrayList<ColGroup> groups, 
MatrixBlock vect, MatrixBlock ret, boolean inclUC, int rl, int ru) 
                throws DMLRuntimeException 
        {
+               ColGroupValue.setupThreadLocalMemory(getMaxNumValues(groups));
+               
                boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT 
                        && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ;
                
@@ -1383,6 +1385,8 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        if( !(grp instanceof ColGroupUncompressed) 
                                && !(cacheDDC1 && grp instanceof ColGroupDDC1) )
                                grp.rightMultByVector(vect, ret, rl, ru);
+               
+               ColGroupValue.cleanupThreadLocalMemory();
        }
        
        /**
@@ -1412,12 +1416,16 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                result.reset();
                result.allocateDenseBlock();
                
+               // setup memory pool for reuse
+               
ColGroupValue.setupThreadLocalMemory(getMaxNumValues(colGroups));
+               
                // delegate matrix-vector operation to each column group
                for (ColGroup grp : colGroups) {                        
                        grp.leftMultByRowVector(rowVector, result);
                }
-
+               
                // post-processing
+               ColGroupValue.cleanupThreadLocalMemory();
                result.recomputeNonZeros();
        }
        
@@ -1457,10 +1465,10 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        
                        //compute remaining compressed column groups in parallel
                        ExecutorService pool = Executors.newFixedThreadPool( 
Math.min(colGroups.size()-((uc!=null)?1:0), k) );
+                       ArrayList<ColGroup>[] grpParts = 
createStaticTaskPartitioning(4*k, false);
                        ArrayList<LeftMatrixMultTask> tasks = new 
ArrayList<LeftMatrixMultTask>();
-                       for( ColGroup grp : colGroups )
-                               if( !(grp instanceof ColGroupUncompressed) )
-                                       tasks.add(new LeftMatrixMultTask(grp, 
rowVector, result));
+                       for( ArrayList<ColGroup> groups : grpParts )
+                               tasks.add(new LeftMatrixMultTask(groups, 
rowVector, result));
                        List<Future<Object>> ret = pool.invokeAll(tasks);       
                        pool.shutdown();
                        for( Future<Object> tmp : ret )
@@ -1535,6 +1543,15 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                
                return grpParts;
        }
+       
+       private static int getMaxNumValues(List<ColGroup> groups) {
+               int numVals = 1;
+               for( ColGroup grp : groups )
+                       if( grp instanceof ColGroupValue )
+                               numVals = Math.max(numVals, 
+                                       ((ColGroupValue)grp).getNumValues());
+               return numVals;
+       }
 
        private ColGroupUncompressed getUncompressedColGroup()
        {
@@ -1547,12 +1564,12 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
        private static class LeftMatrixMultTask implements Callable<Object> 
        {
-               private final ColGroup _group;
+               private final ArrayList<ColGroup> _groups;
                private final MatrixBlock _vect;
                private final MatrixBlock _ret;
                
-               protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, 
MatrixBlock ret)  {
-                       _group = group;
+               protected LeftMatrixMultTask( ArrayList<ColGroup> groups, 
MatrixBlock vect, MatrixBlock ret)  {
+                       _groups = groups;
                        _vect = vect;
                        _ret = ret;
                }
@@ -1560,8 +1577,14 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                @Override
                public Object call() throws DMLRuntimeException 
                {
+                       // setup memory pool for reuse
+                       
ColGroupValue.setupThreadLocalMemory(getMaxNumValues(_groups));
+                       
                        // delegate matrix-vector operation to each column group
-                       _group.leftMultByRowVector(_vect, _ret);
+                       for(ColGroup grp : _groups)
+                               grp.leftMultByRowVector(_vect, _ret);
+                       
+                       ColGroupValue.cleanupThreadLocalMemory();
                        return null;
                }
        }

Reply via email to