Repository: incubator-systemml Updated Branches: refs/heads/master 6cac5ea75 -> 3e28592ef
[SYSTEMML-844] Simplified internal compression thresholds and parameters Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/3e28592e Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/3e28592e Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/3e28592e Branch: refs/heads/master Commit: 3e28592ef9c8eb541901b77b646db7a7b7f596a7 Parents: 6cac5ea Author: Matthias Boehm <[email protected]> Authored: Tue Aug 2 23:03:42 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Aug 2 23:06:26 2016 -0700 ---------------------------------------------------------------------- .../runtime/compress/CompressedMatrixBlock.java | 114 +++++++++---------- .../sysml/runtime/compress/PlanningCoCoder.java | 17 +-- 2 files changed, 59 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3e28592e/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index 2f14f1c..6e2bca8 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -92,9 +92,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable private static final long serialVersionUID = 7319972089143154057L; //internal configuration - public static final int MAX_NUMBER_COCODING_COLUMNS = 1000; - public static final double MIN_COMPRESSION_RATIO = 2.0; - public static final double MIN_RLE_RATIO = 1.0; // Minimum additional compression (non-RLE size / RLE size) before we switch to run-length encoding. public static final boolean TRANSPOSE_INPUT = true; public static final boolean MATERIALIZE_ZEROS = false; public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB @@ -230,6 +227,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable final int numRows = getNumRows(); final int numCols = getNumColumns(); final boolean sparse = isInSparseFormat(); + final double sp = OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()); MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) : LibMatrixReorg.transpose(this, new MatrixBlock(numCols, numRows, sparse), k); @@ -247,7 +245,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // PHASE 1: Classify columns by compression type // We start by determining which columns are amenable to bitmap compression - double uncompressedColumnSize = getUncompressedSize(numRows, 1); + double uncompressedColumnSize = getUncompressedSize(numRows, 1, sp); // information about the bitmap amenable columns List<Integer> bitmapCols = new ArrayList<Integer>(); @@ -256,20 +254,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable List<Long> compressedSizes = new ArrayList<Long>(); HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>(); - // Minimum ratio (size of uncompressed / size of compressed) that we - // will accept when encoding a field with a bitmap. + // Classify columns according to ration (size uncompressed / size compressed), + // where a column is compressible if ratio > 1. CompressedSizeInfo[] sizeInfos = (k > 1) ? computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) : computeCompressedSizeInfos(bitmapSizeEstimator, numCols); - for (int col = 0; col < numCols; col++) - { + for (int col = 0; col < numCols; col++) { long compressedSize = sizeInfos[col].getMinSize(); - double compRatio = uncompressedColumnSize / compressedSize; - - //FIXME: compression ratio should be checked against 1 instead of min compression - //ratio; I think this threshold was only required because we overestimated the - //the uncompressed column size with n\alpha instead of z\alpha - if (compRatio >= MIN_COMPRESSION_RATIO) { + double compRatio = uncompressedColumnSize / compressedSize; + if (compRatio > 1) { bitmapCols.add(col); compressionRatios.put(col, compRatio); colsCards.add(sizeInfos[col].getEstCarinality()); @@ -280,30 +273,20 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } _stats.timePhase1 = time.stop(); - if( LOG.isDebugEnabled() ) - LOG.debug("compression phase 1: "+_stats.timePhase1); - - // Filters for additional types of compression should be inserted here. + if( LOG.isDebugEnabled() ) { + LOG.debug("Compression statistics:"); + LOG.debug("--compression phase 1: "+_stats.timePhase1); + } // PHASE 2: Grouping columns // Divide the bitmap columns into column groups. - List<int[]> bitmapColGrps = null; - if (bitmapCols.size() > MAX_NUMBER_COCODING_COLUMNS) { - // Too many columns to compute co-coding groups with current methods. - // Generate singleton groups. - bitmapColGrps = new ArrayList<int[]>(bitmapCols.size()); - for (int col : bitmapCols) - bitmapColGrps.add(new int[] { col }); - } - else { - bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning(bitmapSizeEstimator, - bitmapCols, colsCards, compressedSizes, numRows, isInSparseFormat() ? - OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()):1, k); - } + List<int[]> bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning( + bitmapSizeEstimator, bitmapCols, colsCards, compressedSizes, numRows, + isInSparseFormat() ? sp : 1, k); _stats.timePhase2 = time.stop(); if( LOG.isDebugEnabled() ) - LOG.debug("compression phase 2: "+_stats.timePhase2); + LOG.debug("--compression phase 2: "+_stats.timePhase2); if( INVESTIGATE_ESTIMATES ) { double est = 0; @@ -315,8 +298,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // PHASE 3: Compress and correct sample-based decisions ColGroup[] colGroups = (k > 1) ? - compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps, k) : - compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps); + compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps, k) : + compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps); allocateColGroupList(); for( int j=0; j<colGroups.length; j++ ) { if( colGroups[j] != null ) { @@ -328,23 +311,30 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable _stats.timePhase3 = time.stop(); if( LOG.isDebugEnabled() ) - LOG.debug("compression phase 3: "+_stats.timePhase3); + LOG.debug("--compression phase 3: "+_stats.timePhase3); // Phase 4: Cleanup // The remaining columns are stored uncompressed as one big column group - if (remainingCols.size() > 0) { + if( !remainingCols.isEmpty() ) { ArrayList<Integer> list = new ArrayList<Integer>(remainingCols); ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawblock); _colGroups.add(ucgroup); } - + + _stats.size = estimateCompressedSizeInMemory(); + _stats.ratio= estimateSizeInMemory() / _stats.size; + //final cleanup (discard uncompressed block) rawblock.cleanupBlock(true, true); this.cleanupBlock(true, true); _stats.timePhase4 = time.stop(); - if( LOG.isDebugEnabled() ) - LOG.debug("compression phase 4: "+_stats.timePhase4); + if( LOG.isDebugEnabled() ) { + LOG.debug("--compression phase 4: "+_stats.timePhase4); + LOG.debug("--num col groups: "+_colGroups.size()); + LOG.debug("--compressed size: "+_stats.size); + LOG.debug("--compression ratio: "+_stats.ratio); + } } public CompressionStatistics getCompressionStatistics() { @@ -401,11 +391,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable * @param groups * @return */ - private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups) + private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, List<int[]> groups) { ColGroup[] ret = new ColGroup[groups.size()]; for( int i=0; i<groups.size(); i++ ) - ret[i] = compressColGroup(in, estim, compRatios, rlen, groups.get(i)); + ret[i] = compressColGroup(in, estim, compRatios, rlen, sp, groups.get(i)); return ret; } @@ -421,14 +411,14 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable * @return * @throws DMLRuntimeException */ - private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, int k) + private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, List<int[]> groups, int k) throws DMLRuntimeException { try { ExecutorService pool = Executors.newFixedThreadPool( k ); ArrayList<CompressTask> tasks = new ArrayList<CompressTask>(); for( int[] colIndexes : groups ) - tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes)); + tasks.add(new CompressTask(in, estim, compRatios, rlen, sp, colIndexes)); List<Future<ColGroup>> rtask = pool.invokeAll(tasks); ArrayList<ColGroup> ret = new ArrayList<ColGroup>(); for( Future<ColGroup> lrtask : rtask ) @@ -450,7 +440,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable * @param colIndexes * @return */ - private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes) + private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] colIndexes) { int[] allGroupIndices = null; int allColsCount = colIndexes.length; @@ -464,17 +454,17 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable { //exact big list and observe compression ratio ubm = BitmapEncoder.extractBitmap(colIndexes, in); - sizeInfo = estim.estimateCompressedColGroupSize(ubm); - double compRatio = getUncompressedSize(rlen, colIndexes.length) / sizeInfo.getMinSize(); + sizeInfo = estim.estimateCompressedColGroupSize(ubm); + double compRatio = getUncompressedSize(rlen, colIndexes.length, sp) / sizeInfo.getMinSize(); - if (compRatio >= MIN_COMPRESSION_RATIO) { + if( compRatio > 1 ) { break; // we have a good group } // modify the group if (compRatioPQ == null) { // first modification - allGroupIndices = Arrays.copyOf(colIndexes, colIndexes.length); + allGroupIndices = colIndexes.clone(); compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>(); for (int i = 0; i < colIndexes.length; i++) compRatioPQ.add(new CompressedColumn(i, compRatios.get(colIndexes[i]))); @@ -491,7 +481,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable colIndexes = new int[allColsCount]; // copying the values that do not equal -1 int ix = 0; - for (int col : allGroupIndices) + for(int col : allGroupIndices) if (col != -1) colIndexes[ix++] = col; } @@ -501,26 +491,26 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable return null; //create compressed column group - long rleNumBytes = sizeInfo.getRLESize(); - long offsetNumBytes = sizeInfo.getOLESize(); - double rleRatio = (double) offsetNumBytes / (double) rleNumBytes; - if (rleRatio > MIN_RLE_RATIO) + long rleSize = sizeInfo.getRLESize(); + long oleSize = sizeInfo.getOLESize(); + if( rleSize < oleSize ) return new ColGroupRLE(colIndexes, rlen, ubm); else return new ColGroupOLE(colIndexes, rlen, ubm); } /** + * Compute a conservative estimate of the uncompressed size of a column group. * * @param rlen * @param clen * @return */ - private static double getUncompressedSize(int rlen, int clen) { - // It is correct to use the dense size as the uncompressed size - // FIXME not numRows but nnz / col otherwise too aggressive overestimation - // of uncompressed size and hence overestimation of compression potential - return 8 * rlen * clen; + private static double getUncompressedSize(int rlen, int clen, double sparsity) { + //we estimate the uncompressed size as 8 * nnz in order to cover both + //sparse and dense with moderate underestimation (which is conservative as + //it is biased towards uncompressed columns) + return 8 * rlen * clen * sparsity; } /** @@ -596,6 +586,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable public double timePhase3 = -1; public double timePhase4 = -1; public double estSize = -1; + public double size = -1; + public double ratio = -1; public CompressionStatistics() { //do nothing @@ -1512,19 +1504,21 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable private CompressedSizeEstimator _estim = null; private HashMap<Integer, Double> _compRatios = null; private int _rlen = -1; + private double _sp = -1; private int[] _colIndexes = null; - protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes ) { + protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] colIndexes ) { _in = in; _estim = estim; _compRatios = compRatios; _rlen = rlen; + _sp = sp; _colIndexes = colIndexes; } @Override public ColGroup call() throws DMLRuntimeException { - return compressColGroup(_in, _estim, _compRatios, _rlen, _colIndexes); + return compressColGroup(_in, _estim, _compRatios, _rlen, _sp, _colIndexes); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3e28592e/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java index d1fd7af..711f383 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java +++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java @@ -36,11 +36,9 @@ public class PlanningCoCoder { //constants for weight computation private final static float GROUPABILITY_THRESHOLD = 0.00064f; - private final static boolean USE_BIN_WEIGHT = false; private final static float PARTITION_WEIGHT = 0.05F; //higher values lead to more grouping private final static float PARTITION_SIZE = PARTITION_WEIGHT * GROUPABILITY_THRESHOLD; - private final static float BIN_WEIGHT_PARAM = -0.65f; //lower values lead to more grouping - + /** * * @param sizeEstimator @@ -243,15 +241,10 @@ public class PlanningCoCoder * @param sparsity * @return */ - private static float computeWeightForCoCoding(int numRows, double sparsity) - { - if( USE_BIN_WEIGHT ) { //new method (non-conclusive) - //return (float) Math.pow(numRows*sparsity,BIN_WEIGHT_PARAM); - return (float) Math.pow(numRows,BIN_WEIGHT_PARAM); - } - else { - return PARTITION_SIZE; - } + private static float computeWeightForCoCoding(int numRows, double sparsity) { + //we use a constant partition size (independent of the number of rows + //in order to ensure constant compression speed independent of blocking) + return PARTITION_SIZE; } /**
