Repository: incubator-systemml Updated Branches: refs/heads/master 13211190f -> b36fb29ea
[SYSTEMML-821] Extended multi-threaded compression (parallel co-coding) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b36fb29e Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b36fb29e Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b36fb29e Branch: refs/heads/master Commit: b36fb29eac81a2b28b862c299b5c01e8b30b8a86 Parents: 1321119 Author: Matthias Boehm <[email protected]> Authored: Fri Jul 29 17:11:53 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jul 29 17:20:01 2016 -0700 ---------------------------------------------------------------------- .../runtime/compress/CompressedMatrixBlock.java | 18 +-- .../sysml/runtime/compress/PlanningCoCoder.java | 132 +++++++++++++++---- .../runtime/compress/PlanningCoCodingGroup.java | 7 +- 3 files changed, 117 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b36fb29e/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index f2ccb43..2f14f1c 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -237,9 +237,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable CompressedSizeEstimator bitmapSizeEstimator = SizeEstimatorFactory.getSizeEstimator(rawblock, numRows); - //allocate list of column groups - allocateColGroupList(); - // The current implementation of this method is written for correctness, // not for performance or for minimal use of temporary space. @@ -255,7 +252,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // information about the bitmap amenable columns List<Integer> bitmapCols = new ArrayList<Integer>(); List<Integer> uncompressedCols = new ArrayList<Integer>(); - List<Integer> colsCardinalities = new ArrayList<Integer>(); + List<Integer> colsCards = new ArrayList<Integer>(); List<Long> compressedSizes = new ArrayList<Long>(); HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>(); @@ -275,7 +272,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable if (compRatio >= MIN_COMPRESSION_RATIO) { bitmapCols.add(col); compressionRatios.put(col, compRatio); - colsCardinalities.add(sizeInfos[col].getEstCarinality()); + colsCards.add(sizeInfos[col].getEstCarinality()); compressedSizes.add(compressedSize); } else @@ -295,15 +292,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // Too many columns to compute co-coding groups with current methods. // Generate singleton groups. bitmapColGrps = new ArrayList<int[]>(bitmapCols.size()); - for (int col : bitmapCols) { + for (int col : bitmapCols) bitmapColGrps.add(new int[] { col }); - } } else { - bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning( - bitmapSizeEstimator, bitmapCols, colsCardinalities, - compressedSizes, numRows, isInSparseFormat() ? - OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()): 1); + bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning(bitmapSizeEstimator, + bitmapCols, colsCards, compressedSizes, numRows, isInSparseFormat() ? + OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()):1, k); } _stats.timePhase2 = time.stop(); @@ -322,6 +317,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable ColGroup[] colGroups = (k > 1) ? compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps, k) : compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps); + allocateColGroupList(); for( int j=0; j<colGroups.length; j++ ) { if( colGroups[j] != null ) { for( int col : colGroups[j].getColIndices() ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b36fb29e/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java index 07d9757..d1fd7af 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java +++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java @@ -24,7 +24,12 @@ import java.util.HashMap; import java.util.List; import java.util.PriorityQueue; import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; public class PlanningCoCoder @@ -45,58 +50,110 @@ public class PlanningCoCoder * @param numRows * @param sparsity * @return + * @throws DMLRuntimeException */ public static List<int[]> findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> availCols, - List<Integer> colsCardinalities,List<Long> compressedSize, int numRows, double sparsity) + List<Integer> colsCardinalities, List<Long> compressedSize, int numRows, double sparsity, int k) + throws DMLRuntimeException { - float numRowsWeight = numRows; List<int[]> retGroups = new ArrayList<int[]>(); + // filtering out non-groupable columns as singleton groups - int numCols = availCols.size(); - List<Integer> groupabaleCols = new ArrayList<Integer>(); // weighted of each column is the ratio of its cardinality to the number // of rows scaled by the matrix sparsity - List<Float> groupabaleColWeights = new ArrayList<Float>(); - HashMap<Integer, GroupableColInfo> groupableColsInfo = new HashMap<Integer, GroupableColInfo>(); + int numCols = availCols.size(); + List<Integer> groupCols = new ArrayList<Integer>(); + List<Float> groupColWeights = new ArrayList<Float>(); + HashMap<Integer, GroupableColInfo> groupColsInfo = new HashMap<Integer, GroupableColInfo>(); for (int i = 0; i < numCols; i++) { int colIx = availCols.get(i); int cardinality = colsCardinalities.get(i); - float weight = ((float) cardinality) / numRowsWeight; + float weight = ((float) cardinality) / numRows; if (weight <= GROUPABILITY_THRESHOLD) { - groupabaleCols.add(colIx); - groupabaleColWeights.add(weight); - groupableColsInfo.put(colIx, new GroupableColInfo(weight, - compressedSize.get(i))); + groupCols.add(colIx); + groupColWeights.add(weight); + groupColsInfo.put(colIx, new GroupableColInfo(weight,compressedSize.get(i))); } else { retGroups.add(new int[] { colIx }); } } + // bin packing based on PARTITION_WEIGHT and column weights float weight = computeWeightForCoCoding(numRows, sparsity); TreeMap<Float, List<List<Integer>>> bins = new PlanningBinPacker( - weight, groupabaleCols, groupabaleColWeights) - .packFirstFit(); + weight, groupCols, groupColWeights).packFirstFit(); // brute force grouping within each partition + retGroups.addAll( (k > 1) ? + getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows, k) : + getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows)); + + return retGroups; + } + + /** + * + * @param bins + * @param groupColsInfo + * @param estim + * @param rlen + * @return + */ + private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen) + { + List<int[]> retGroups = new ArrayList<int[]>(); for (List<List<Integer>> binList : bins.values()) { for (List<Integer> bin : binList) { // building an array of singleton CoCodingGroup - PlanningCoCodingGroup[] singltonGroups = new PlanningCoCodingGroup[bin.size()]; - int i = 0; - GroupableColInfo colInfo; - for (Integer col : bin) { - colInfo = groupableColsInfo.get(col); - singltonGroups[i++] = new PlanningCoCodingGroup(col, colInfo.size, - colInfo.cardRatio); - } + ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>(); + for (Integer col : bin) + sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col))); + // brute force co-coding PlanningCoCodingGroup[] outputGroups = findCocodesBruteForce( - sizeEstimator, numRowsWeight, singltonGroups); - - for (PlanningCoCodingGroup grp : outputGroups) { + estim, rlen, sgroups.toArray(new PlanningCoCodingGroup[0])); + for (PlanningCoCodingGroup grp : outputGroups) retGroups.add(grp.getColIndices()); - } } } + + return retGroups; + } + + /** + * + * @param bins + * @param groupColsInfo + * @param estim + * @param rlen + * @param k + * @return + * @throws DMLRuntimeException + */ + private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen, int k) + throws DMLRuntimeException + { + List<int[]> retGroups = new ArrayList<int[]>(); + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + ArrayList<CocodeTask> tasks = new ArrayList<CocodeTask>(); + for (List<List<Integer>> binList : bins.values()) + for (List<Integer> bin : binList) { + // building an array of singleton CoCodingGroup + ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>(); + for (Integer col : bin) + sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col))); + tasks.add(new CocodeTask(estim, sgroups, rlen)); + } + List<Future<PlanningCoCodingGroup[]>> rtask = pool.invokeAll(tasks); + for( Future<PlanningCoCodingGroup[]> lrtask : rtask ) + for (PlanningCoCodingGroup grp : lrtask.get()) + retGroups.add(grp.getColIndices()); + pool.shutdown(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + return retGroups; } @@ -215,7 +272,7 @@ public class PlanningCoCoder /** * */ - private static class GroupableColInfo { + protected static class GroupableColInfo { float cardRatio; long size; @@ -224,4 +281,27 @@ public class PlanningCoCoder size = lsize; } } + + /** + * + */ + private static class CocodeTask implements Callable<PlanningCoCodingGroup[]> + { + private CompressedSizeEstimator _estim = null; + private ArrayList<PlanningCoCodingGroup> _sgroups = null; + private int _rlen = -1; + + protected CocodeTask( CompressedSizeEstimator estim, ArrayList<PlanningCoCodingGroup> sgroups, int rlen ) { + _estim = estim; + _sgroups = sgroups; + _rlen = rlen; + } + + @Override + public PlanningCoCodingGroup[] call() throws DMLRuntimeException { + // brute force co-coding + return findCocodesBruteForce(_estim, _rlen, + _sgroups.toArray(new PlanningCoCodingGroup[0])); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b36fb29e/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java index 6553c20..a1e0123 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java +++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.compress; import java.util.Arrays; +import org.apache.sysml.runtime.compress.PlanningCoCoder.GroupableColInfo; import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo; @@ -38,10 +39,10 @@ public class PlanningCoCodingGroup * Constructor for a one-column group; i.e. do not co-code a given column. * */ - public PlanningCoCodingGroup(int col, long estSize, float cardRatio) { + public PlanningCoCodingGroup(int col, GroupableColInfo info) { _colIndexes = new int[]{col}; - _estSize = estSize; - _cardRatio = cardRatio; + _estSize = info.size; + _cardRatio = info.cardRatio; } /**
