Repository: incubator-systemml
Updated Branches:
  refs/heads/master 13211190f -> b36fb29ea


[SYSTEMML-821] Extended multi-threaded compression (parallel co-coding)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b36fb29e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b36fb29e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b36fb29e

Branch: refs/heads/master
Commit: b36fb29eac81a2b28b862c299b5c01e8b30b8a86
Parents: 1321119
Author: Matthias Boehm <[email protected]>
Authored: Fri Jul 29 17:11:53 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jul 29 17:20:01 2016 -0700

----------------------------------------------------------------------
 .../runtime/compress/CompressedMatrixBlock.java |  18 +--
 .../sysml/runtime/compress/PlanningCoCoder.java | 132 +++++++++++++++----
 .../runtime/compress/PlanningCoCodingGroup.java |   7 +-
 3 files changed, 117 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b36fb29e/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index f2ccb43..2f14f1c 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -237,9 +237,6 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                CompressedSizeEstimator bitmapSizeEstimator = 
                                SizeEstimatorFactory.getSizeEstimator(rawblock, 
numRows);
 
-               //allocate list of column groups
-               allocateColGroupList();
-
                // The current implementation of this method is written for 
correctness,
                // not for performance or for minimal use of temporary space.
 
@@ -255,7 +252,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                // information about the bitmap amenable columns
                List<Integer> bitmapCols = new ArrayList<Integer>();
                List<Integer> uncompressedCols = new ArrayList<Integer>();
-               List<Integer> colsCardinalities = new ArrayList<Integer>();
+               List<Integer> colsCards = new ArrayList<Integer>();
                List<Long> compressedSizes = new ArrayList<Long>();
                HashMap<Integer, Double> compressionRatios = new 
HashMap<Integer, Double>();
                
@@ -275,7 +272,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        if (compRatio >= MIN_COMPRESSION_RATIO) {
                                bitmapCols.add(col);
                                compressionRatios.put(col, compRatio);
-                               
colsCardinalities.add(sizeInfos[col].getEstCarinality());
+                               
colsCards.add(sizeInfos[col].getEstCarinality());
                                compressedSizes.add(compressedSize);
                        }
                        else
@@ -295,15 +292,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        // Too many columns to compute co-coding groups with 
current methods.
                        // Generate singleton groups.
                        bitmapColGrps = new ArrayList<int[]>(bitmapCols.size());
-                       for (int col : bitmapCols) {
+                       for (int col : bitmapCols)
                                bitmapColGrps.add(new int[] { col });
-                       }
                } 
                else {
-                       bitmapColGrps = 
PlanningCoCoder.findCocodesByPartitioning(
-                                       bitmapSizeEstimator, bitmapCols, 
colsCardinalities,
-                                       compressedSizes, numRows, 
isInSparseFormat() ? 
-                                       OptimizerUtils.getSparsity(numRows, 
numCols, getNonZeros()): 1);
+                       bitmapColGrps = 
PlanningCoCoder.findCocodesByPartitioning(bitmapSizeEstimator, 
+                                       bitmapCols, colsCards, compressedSizes, 
numRows, isInSparseFormat() ? 
+                                       OptimizerUtils.getSparsity(numRows, 
numCols, getNonZeros()):1, k);
                }
 
                _stats.timePhase2 = time.stop();
@@ -322,6 +317,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                ColGroup[] colGroups = (k > 1) ?
                                compressColGroups(rawblock, 
bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps, k) : 
                                compressColGroups(rawblock, 
bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps);    
+               allocateColGroupList();
                for( int j=0; j<colGroups.length; j++ ) {
                        if( colGroups[j] != null ) {
                                for( int col : colGroups[j].getColIndices() )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b36fb29e/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
index 07d9757..d1fd7af 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
@@ -24,7 +24,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
 
 public class PlanningCoCoder 
@@ -45,58 +50,110 @@ public class PlanningCoCoder
         * @param numRows
         * @param sparsity
         * @return
+        * @throws DMLRuntimeException 
         */
        public static List<int[]> 
findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> 
availCols, 
-                       List<Integer> colsCardinalities,List<Long> 
compressedSize, int numRows, double sparsity) 
+                       List<Integer> colsCardinalities, List<Long> 
compressedSize, int numRows, double sparsity, int k) 
+               throws DMLRuntimeException 
        {
-               float numRowsWeight = numRows;
                List<int[]> retGroups = new ArrayList<int[]>();
+               
                // filtering out non-groupable columns as singleton groups
-               int numCols = availCols.size();
-               List<Integer> groupabaleCols = new ArrayList<Integer>();
                // weighted of each column is the ratio of its cardinality to 
the number
                // of rows scaled by the matrix sparsity
-               List<Float> groupabaleColWeights = new ArrayList<Float>();
-               HashMap<Integer, GroupableColInfo> groupableColsInfo = new 
HashMap<Integer, GroupableColInfo>();
+               int numCols = availCols.size();
+               List<Integer> groupCols = new ArrayList<Integer>();
+               List<Float> groupColWeights = new ArrayList<Float>();
+               HashMap<Integer, GroupableColInfo> groupColsInfo = new 
HashMap<Integer, GroupableColInfo>();
                for (int i = 0; i < numCols; i++) {
                        int colIx = availCols.get(i);
                        int cardinality = colsCardinalities.get(i);
-                       float weight = ((float) cardinality) / numRowsWeight;
+                       float weight = ((float) cardinality) / numRows;
                        if (weight <= GROUPABILITY_THRESHOLD) {
-                               groupabaleCols.add(colIx);
-                               groupabaleColWeights.add(weight);
-                               groupableColsInfo.put(colIx, new 
GroupableColInfo(weight,
-                                               compressedSize.get(i)));
+                               groupCols.add(colIx);
+                               groupColWeights.add(weight);
+                               groupColsInfo.put(colIx, new 
GroupableColInfo(weight,compressedSize.get(i)));
                        } else {
                                retGroups.add(new int[] { colIx });
                        }
                }
+               
                // bin packing based on PARTITION_WEIGHT and column weights
                float weight = computeWeightForCoCoding(numRows, sparsity);
                TreeMap<Float, List<List<Integer>>> bins = new 
PlanningBinPacker(
-                               weight, groupabaleCols, groupabaleColWeights) 
-                               .packFirstFit();
+                               weight, groupCols, 
groupColWeights).packFirstFit();
 
                // brute force grouping within each partition
+               retGroups.addAll( (k > 1) ?
+                               getCocodingGroupsBruteForce(bins, 
groupColsInfo, sizeEstimator, numRows, k) :
+                               getCocodingGroupsBruteForce(bins, 
groupColsInfo, sizeEstimator, numRows));
+                       
+               return retGroups;
+       }
+       
+       /**
+        * 
+        * @param bins
+        * @param groupColsInfo
+        * @param estim
+        * @param rlen
+        * @return
+        */
+       private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, 
List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, 
CompressedSizeEstimator estim, int rlen) 
+       {
+               List<int[]> retGroups = new ArrayList<int[]>();         
                for (List<List<Integer>> binList : bins.values()) {
                        for (List<Integer> bin : binList) {
                                // building an array of singleton CoCodingGroup
-                               PlanningCoCodingGroup[] singltonGroups = new 
PlanningCoCodingGroup[bin.size()];
-                               int i = 0;
-                               GroupableColInfo colInfo;
-                               for (Integer col : bin) {
-                                       colInfo = groupableColsInfo.get(col);
-                                       singltonGroups[i++] = new 
PlanningCoCodingGroup(col, colInfo.size,
-                                                       colInfo.cardRatio);
-                               }
+                               ArrayList<PlanningCoCodingGroup> sgroups = new 
ArrayList<PlanningCoCodingGroup>();
+                               for (Integer col : bin)
+                                       sgroups.add(new 
PlanningCoCodingGroup(col, groupColsInfo.get(col)));
+                               // brute force co-coding        
                                PlanningCoCodingGroup[] outputGroups = 
findCocodesBruteForce(
-                                               sizeEstimator, numRowsWeight, 
singltonGroups);
-                               
-                               for (PlanningCoCodingGroup grp : outputGroups) {
+                                               estim, rlen, 
sgroups.toArray(new PlanningCoCodingGroup[0]));
+                               for (PlanningCoCodingGroup grp : outputGroups)
                                        retGroups.add(grp.getColIndices());
-                               }
                        }
                }
+               
+               return retGroups;
+       }
+       
+       /**
+        * 
+        * @param bins
+        * @param groupColsInfo
+        * @param estim
+        * @param rlen
+        * @param k
+        * @return
+        * @throws DMLRuntimeException 
+        */
+       private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, 
List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, 
CompressedSizeEstimator estim, int rlen, int k) 
+               throws DMLRuntimeException 
+       {
+               List<int[]> retGroups = new ArrayList<int[]>();         
+               try {
+                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ArrayList<CocodeTask> tasks = new 
ArrayList<CocodeTask>();
+                       for (List<List<Integer>> binList : bins.values())
+                               for (List<Integer> bin : binList) {
+                                       // building an array of singleton 
CoCodingGroup
+                                       ArrayList<PlanningCoCodingGroup> 
sgroups = new ArrayList<PlanningCoCodingGroup>();
+                                       for (Integer col : bin)
+                                               sgroups.add(new 
PlanningCoCodingGroup(col, groupColsInfo.get(col)));
+                                       tasks.add(new CocodeTask(estim, 
sgroups, rlen));
+                               }
+                       List<Future<PlanningCoCodingGroup[]>> rtask = 
pool.invokeAll(tasks);    
+                       for( Future<PlanningCoCodingGroup[]> lrtask : rtask )
+                               for (PlanningCoCodingGroup grp : lrtask.get())
+                                       retGroups.add(grp.getColIndices());
+                       pool.shutdown();
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+               
                return retGroups;
        }
 
@@ -215,7 +272,7 @@ public class PlanningCoCoder
        /**
         * 
         */
-       private static class GroupableColInfo {
+       protected static class GroupableColInfo {
                float cardRatio;
                long size;
 
@@ -224,4 +281,27 @@ public class PlanningCoCoder
                        size = lsize;
                }
        }
+       
+       /**
+        * 
+        */
+       private static class CocodeTask implements 
Callable<PlanningCoCodingGroup[]> 
+       {
+               private CompressedSizeEstimator _estim = null;
+               private ArrayList<PlanningCoCodingGroup> _sgroups = null;
+               private int _rlen = -1;
+               
+               protected CocodeTask( CompressedSizeEstimator estim, 
ArrayList<PlanningCoCodingGroup> sgroups, int rlen )  {
+                       _estim = estim;
+                       _sgroups = sgroups;
+                       _rlen = rlen;
+               }
+               
+               @Override
+               public PlanningCoCodingGroup[] call() throws 
DMLRuntimeException {
+                       // brute force co-coding        
+                       return findCocodesBruteForce(_estim, _rlen, 
+                                       _sgroups.toArray(new 
PlanningCoCodingGroup[0]));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b36fb29e/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
index 6553c20..a1e0123 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.compress;
 
 import java.util.Arrays;
 
+import org.apache.sysml.runtime.compress.PlanningCoCoder.GroupableColInfo;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
 
@@ -38,10 +39,10 @@ public class PlanningCoCodingGroup
         * Constructor for a one-column group; i.e. do not co-code a given 
column.
         * 
         */
-       public PlanningCoCodingGroup(int col, long estSize, float cardRatio) {
+       public PlanningCoCodingGroup(int col, GroupableColInfo info) {
                _colIndexes = new int[]{col};
-               _estSize = estSize;
-               _cardRatio = cardRatio;
+               _estSize = info.size;
+               _cardRatio = info.cardRatio;
        }
 
        /**

Reply via email to