Repository: incubator-systemml Updated Branches: refs/heads/master 54cad60c2 -> f5b78a322
[SYSTEMML-1517] Improved memory handling compressed matrix-vector mult This patch improves the temporary memory handling of compressed matrix-vector and vector-matrix multiplication. So far, we repeatedly allocated double and int vectors in the number of values per column group, which causes unnecessary garbage collection overhead on openjdk. We now allocate these vectors once for all column groups with the maximum number of values and each column group simply resets and uses a subset of this temporary memory. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f5b78a32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f5b78a32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f5b78a32 Branch: refs/heads/master Commit: f5b78a3222cedeafb582cbf728f30522dbcd0daf Parents: 54cad60 Author: Matthias Boehm <mboe...@gmail.com> Authored: Wed Apr 12 20:42:34 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Wed Apr 12 20:45:57 2017 -0700 ---------------------------------------------------------------------- .../sysml/runtime/compress/ColGroupDDC1.java | 4 +- .../sysml/runtime/compress/ColGroupDDC2.java | 2 +- .../sysml/runtime/compress/ColGroupOLE.java | 6 +-- .../sysml/runtime/compress/ColGroupRLE.java | 6 +-- .../sysml/runtime/compress/ColGroupValue.java | 56 +++++++++++++++++++- .../runtime/compress/CompressedMatrixBlock.java | 39 +++++++++++--- 6 files changed, 94 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java index 4db871f..82adb55 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java @@ -223,7 +223,7 @@ public class ColGroupDDC1 extends ColGroupDDC sb[j] = b[grps[i]._colIndexes[j]]; } //pre-aggregate all distinct values (guaranteed <=255) - vals[i] = grps[i].preaggValues(grps[i].getNumValues(), sb); + vals[i] = grps[i].preaggValues(grps[i].getNumValues(), sb, true); } //cache-conscious matrix-vector multiplication @@ -247,7 +247,7 @@ public class ColGroupDDC1 extends ColGroupDDC { //iterative over codes and pre-aggregate inputs per code (guaranteed <=255) //temporary array also avoids false sharing in multi-threaded environments - double[] vals = new double[numVals]; + double[] vals = allocDVector(numVals, true); for( int i=0; i<nrow; i++ ) { vals[_data[i]&0xFF] += a[i]; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java index 5f29979..66ba149 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java @@ -221,7 +221,7 @@ public class ColGroupDDC2 extends ColGroupDDC { //iterative over codes and pre-aggregate inputs per code //temporary array also avoids false sharing in multi-threaded environments - double[] vals = new double[numVals]; + double[] vals = allocDVector(numVals, true); for( int i=0; i<nrow; i++ ) { vals[_data[i]] += a[i]; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java index f47a432..80e53d4 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java @@ -380,8 +380,8 @@ public class ColGroupOLE extends ColGroupOffset //step 1: prepare position and value arrays //current pos per OLs / output values - int[] apos = new int[numVals]; - double[] cvals = new double[numVals]; + int[] apos = allocIVector(numVals, true); + double[] cvals = allocDVector(numVals, true); //step 2: cache conscious matrix-vector via horizontal scans for( int ai=0; ai<n; ai+=blksz2 ) @@ -687,7 +687,7 @@ public class ColGroupOLE extends ColGroupOffset * @return array of positions for all values */ private int[] skipScan(int numVals, int rl) { - int[] ret = new int[numVals]; + int[] ret = allocIVector(numVals, rl==0); final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ; if( rl > 0 ) { //rl aligned with blksz http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java index 6d2ec43..63786d0 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java @@ -349,9 +349,9 @@ public class ColGroupRLE extends ColGroupOffset //step 1: prepare position and value arrays //current pos per OLs / output values - int[] apos = new int[numVals]; int[] astart = new int[numVals]; - double[] cvals = new double[numVals]; + int[] apos = allocIVector(numVals, true); + double[] cvals = allocDVector(numVals, true); //step 2: cache conscious matrix-vector via horizontal scans for( int ai=0; ai<n; ai+=blksz ) @@ -677,7 +677,7 @@ public class ColGroupRLE extends ColGroupOffset * @return array of positions for all values */ private int[] skipScan(int numVals, int rl, int[] astart) { - int[] apos = new int[numVals]; + int[] apos = allocIVector(numVals, rl==0); if( rl > 0 ) { //rl aligned with blksz for (int k = 0; k < numVals; k++) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java index 928296d..d8870a4 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java @@ -28,6 +28,7 @@ import org.apache.sysml.runtime.functionobjects.KahanFunction; import org.apache.sysml.runtime.functionobjects.KahanPlus; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.ScalarOperator; @@ -46,7 +47,11 @@ public abstract class ColGroupValue extends ColGroup //slight performance decrease for parallel incl multi-threaded, hence not applied for //distributed operations (also because compression time + garbage collection increases) public static final boolean SORT_VALUES_BY_LENGTH = true; - + + //thread-local pairs of reusable temporary vectors for positions and values + private static ThreadLocal<Pair<int[], double[]>> memPool = new ThreadLocal<Pair<int[], double[]>>() { + @Override protected Pair<int[], double[]> initialValue() { return new Pair<int[], double[]>(); } + }; /** Distinct values associated with individual bitmaps. */ protected double[] _values; //linearized <numcol vals> <numcol vals> @@ -184,7 +189,12 @@ public abstract class ColGroupValue extends ColGroup } protected final double[] preaggValues(int numVals, double[] b) { - double[] ret = new double[numVals]; + return preaggValues(numVals, b, false); + } + + protected final double[] preaggValues(int numVals, double[] b, boolean allocNew) { + double[] ret = allocNew ? new double[numVals] : + allocDVector(numVals, false); for( int k = 0; k < numVals; k++ ) ret[k] = sumValues(k, b); @@ -300,4 +310,46 @@ public abstract class ColGroupValue extends ColGroup */ public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) throws DMLRuntimeException; + + + //dynamic memory management + + public static void setupThreadLocalMemory(int len) { + Pair<int[], double[]> p = new Pair<int[], double[]>(); + p.setKey(new int[len]); + p.setValue(new double[len]); + memPool.set(p); + } + + public static void cleanupThreadLocalMemory() { + memPool.remove(); + } + + protected static double[] allocDVector(int len, boolean reset) { + Pair<int[], double[]> p = memPool.get(); + + //sanity check for missing setup + if( p.getValue() == null ) + return new double[len]; + + //get and reset if necessary + double[] tmp = p.getValue(); + if( reset ) + Arrays.fill(tmp, 0, len, 0); + return tmp; + } + + protected static int[] allocIVector(int len, boolean reset) { + Pair<int[], double[]> p = memPool.get(); + + //sanity check for missing setup + if( p.getKey() == null ) + return new int[len]; + + //get and reset if necessary + int[] tmp = p.getKey(); + if( reset ) + Arrays.fill(tmp, 0, len, 0); + return tmp; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5b78a32/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index 7faed54..ead0b19 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -1358,6 +1358,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable private static void rightMultByVector(ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, boolean inclUC, int rl, int ru) throws DMLRuntimeException { + ColGroupValue.setupThreadLocalMemory(getMaxNumValues(groups)); + boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ; @@ -1383,6 +1385,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable if( !(grp instanceof ColGroupUncompressed) && !(cacheDDC1 && grp instanceof ColGroupDDC1) ) grp.rightMultByVector(vect, ret, rl, ru); + + ColGroupValue.cleanupThreadLocalMemory(); } /** @@ -1412,12 +1416,16 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable result.reset(); result.allocateDenseBlock(); + // setup memory pool for reuse + ColGroupValue.setupThreadLocalMemory(getMaxNumValues(colGroups)); + // delegate matrix-vector operation to each column group for (ColGroup grp : colGroups) { grp.leftMultByRowVector(rowVector, result); } - + // post-processing + ColGroupValue.cleanupThreadLocalMemory(); result.recomputeNonZeros(); } @@ -1457,10 +1465,10 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //compute remaining compressed column groups in parallel ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size()-((uc!=null)?1:0), k) ); + ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(4*k, false); ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>(); - for( ColGroup grp : colGroups ) - if( !(grp instanceof ColGroupUncompressed) ) - tasks.add(new LeftMatrixMultTask(grp, rowVector, result)); + for( ArrayList<ColGroup> groups : grpParts ) + tasks.add(new LeftMatrixMultTask(groups, rowVector, result)); List<Future<Object>> ret = pool.invokeAll(tasks); pool.shutdown(); for( Future<Object> tmp : ret ) @@ -1535,6 +1543,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable return grpParts; } + + private static int getMaxNumValues(List<ColGroup> groups) { + int numVals = 1; + for( ColGroup grp : groups ) + if( grp instanceof ColGroupValue ) + numVals = Math.max(numVals, + ((ColGroupValue)grp).getNumValues()); + return numVals; + } private ColGroupUncompressed getUncompressedColGroup() { @@ -1547,12 +1564,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable private static class LeftMatrixMultTask implements Callable<Object> { - private final ColGroup _group; + private final ArrayList<ColGroup> _groups; private final MatrixBlock _vect; private final MatrixBlock _ret; - protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret) { - _group = group; + protected LeftMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret) { + _groups = groups; _vect = vect; _ret = ret; } @@ -1560,8 +1577,14 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable @Override public Object call() throws DMLRuntimeException { + // setup memory pool for reuse + ColGroupValue.setupThreadLocalMemory(getMaxNumValues(_groups)); + // delegate matrix-vector operation to each column group - _group.leftMultByRowVector(_vect, _ret); + for(ColGroup grp : _groups) + grp.leftMultByRowVector(_vect, _ret); + + ColGroupValue.cleanupThreadLocalMemory(); return null; } }