[SYSTEMML-821] Multi-threaded matrix block compression, tests

This patch enables multi-threaded matrix block compression in the CP
compression instruction. In detail, this is realized via (1)
multi-threaded transpose, (2) multi-threaded column classification, and
(3) multi-threaded column compression. Down the road we will also add
multi-threaded column co-coding but this requires some major refactoring
first. On the sparse Imagenet dataset (1262102x900) the individual
performance improvements were as follows (compression phases 1-4)
leading to an overall improvement of 4.5x:

(0) baseline: 22.6s, 4.6s, 26.9s, 0.3s,
(1) transpose: 7.2s, 4.6s, 26.9s, 0.3s,
(2) classify: 4.0s, 4.6s, 26.9s, 0.3s,
(3) grouping: 3.8s, 4.7s, 3.4s, 0.3s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/873bae76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/873bae76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/873bae76

Branch: refs/heads/master
Commit: 873bae76bad9267b2c710404f7a08707ca76ca18
Parents: 44c8f9d
Author: Matthias Boehm <[email protected]>
Authored: Thu Jul 28 02:01:07 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Jul 28 02:01:07 2016 -0700

----------------------------------------------------------------------
 .../runtime/compress/CompressedMatrixBlock.java | 350 ++++++++++++++-----
 .../cp/CompressionCPInstruction.java            |   3 +-
 .../runtime/matrix/data/LibMatrixReorg.java     |   2 +-
 .../functions/compress/ParCompressionTest.java  | 169 +++++++++
 .../functions/compress/ZPackageSuite.java       |   1 +
 5 files changed, 428 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 81d933d..f2ccb43 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -193,8 +193,22 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
         * which should be fixed if we move ahead with this compression 
strategy.
         * 
         * +per column sparsity
+        * 
+        * @throws DMLRuntimeException
         */
        public void compress() 
+               throws DMLRuntimeException
+       {
+               //default sequential execution
+               compress(1);
+       }
+       
+       /**
+        * 
+        * @param k  number of threads
+        * @throws DMLRuntimeException
+        */
+       public void compress(int k) 
                throws DMLRuntimeException 
        {
                //check for redundant compression
@@ -216,11 +230,8 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                final int numRows = getNumRows();
                final int numCols = getNumColumns();
                final boolean sparse = isInSparseFormat();
-               MatrixBlock rawblock = this;
-               if( TRANSPOSE_INPUT )
-                       rawblock = LibMatrixReorg.transpose(rawblock, new 
MatrixBlock(numCols, numRows, sparse));
-               else
-                       rawblock = new MatrixBlock(this);
+               MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) 
:
+                       LibMatrixReorg.transpose(this, new MatrixBlock(numCols, 
numRows, sparse), k);
                
                //construct sample-based size estimator
                CompressedSizeEstimator bitmapSizeEstimator = 
@@ -234,18 +245,12 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
                // We start with a full set of columns.
                HashSet<Integer> remainingCols = new HashSet<Integer>();
-               for (int i = 0; i < numCols; i++) {
+               for (int i = 0; i < numCols; i++)
                        remainingCols.add(i);
-               }
 
                // PHASE 1: Classify columns by compression type
-               // We start by determining which columns are amenable to bitmap
-               // compression
-
-               // It is correct to use the dense size as the uncompressed size
-               // FIXME not numRows but nnz / col otherwise too aggressive 
overestimation
-               // of uncompressed size and hence overestimation of compression 
potential
-               double uncompressedColumnSize = 8 * numRows;
+               // We start by determining which columns are amenable to bitmap 
compression
+               double uncompressedColumnSize = getUncompressedSize(numRows, 1);
 
                // information about the bitmap amenable columns
                List<Integer> bitmapCols = new ArrayList<Integer>();
@@ -256,11 +261,12 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                
                // Minimum ratio (size of uncompressed / size of compressed) 
that we
                // will accept when encoding a field with a bitmap.
+               CompressedSizeInfo[] sizeInfos = (k > 1) ?
+                               computeCompressedSizeInfos(bitmapSizeEstimator, 
numCols, k) : 
+                               computeCompressedSizeInfos(bitmapSizeEstimator, 
numCols);               
                for (int col = 0; col < numCols; col++) 
-               {
-                       CompressedSizeInfo compressedSizeInfo = 
bitmapSizeEstimator
-                                       .estimateCompressedColGroupSize(new 
int[] { col });
-                       long compressedSize = compressedSizeInfo.getMinSize();
+               {       
+                       long compressedSize = sizeInfos[col].getMinSize();
                        double compRatio = uncompressedColumnSize / 
compressedSize;
                        
                        //FIXME: compression ratio should be checked against 1 
instead of min compression
@@ -269,7 +275,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        if (compRatio >= MIN_COMPRESSION_RATIO) {
                                bitmapCols.add(col);
                                compressionRatios.put(col, compRatio);
-                               
colsCardinalities.add(compressedSizeInfo.getEstCarinality());
+                               
colsCardinalities.add(sizeInfos[col].getEstCarinality());
                                compressedSizes.add(compressedSize);
                        }
                        else
@@ -313,77 +319,15 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                }
                
                // PHASE 3: Compress and correct sample-based decisions
-               
-               for (int[] groupIndices : bitmapColGrps) 
-               {
-                       int[] allGroupIndices = null;
-                       int allColsCount = groupIndices.length;
-                       CompressedSizeInfo bitmapSizeInfo;
-                       // The compression type is decided based on a full 
bitmap since it
-                       // will be reused for the actual compression step.
-                       UncompressedBitmap ubm;
-                       PriorityQueue<CompressedColumn> compRatioPQ = null;
-                       boolean skipGroup = false;
-                       while (true) 
-                       {
-                               ubm = BitmapEncoder.extractBitmap(groupIndices, 
rawblock); 
-                               bitmapSizeInfo = bitmapSizeEstimator
-                                               
.estimateCompressedColGroupSize(ubm);
-                               double compRatio = uncompressedColumnSize * 
groupIndices.length
-                                               / bitmapSizeInfo.getMinSize();
-                               if (compRatio >= MIN_COMPRESSION_RATIO) {
-                                       // we have a good group
-                                       for( Integer col : groupIndices )
-                                               remainingCols.remove(col);
-                                       break;
-                               } else {
-                                       // modify the group
-                                       if (compRatioPQ == null) {
-                                               // first modification
-                                               allGroupIndices = 
Arrays.copyOf(groupIndices, groupIndices.length);
-                                               compRatioPQ = new 
PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
-                                               for (int i = 0; i < 
groupIndices.length; i++)
-                                                       compRatioPQ.add(new 
CompressedColumn(i,
-                                                                       
compressionRatios.get(groupIndices[i])));
-                                       }
-
-                                       // index in allGroupIndices
-                                       int removeIx = compRatioPQ.poll().colIx;
-                                       allGroupIndices[removeIx] = -1;
-                                       allColsCount--;
-                                       if (allColsCount == 0) {
-                                               skipGroup = true;
-                                               break;
-                                       }
-                                       groupIndices = new int[allColsCount];
-                                       // copying the values that do not equal 
-1
-                                       int ix = 0;
-                                       for (int col : allGroupIndices) {
-                                               if (col != -1) {
-                                                       groupIndices[ix++] = 
col;
-                                               }
-                                       }
-
-                               }
+               ColGroup[] colGroups = (k > 1) ?
+                               compressColGroups(rawblock, 
bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps, k) : 
+                               compressColGroups(rawblock, 
bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps);    
+               for( int j=0; j<colGroups.length; j++ ) {
+                       if( colGroups[j] != null ) {
+                               for( int col : colGroups[j].getColIndices() )
+                                       remainingCols.remove(col);
+                               _colGroups.add(colGroups[j]);
                        }
-
-                       if (skipGroup)
-                               continue;
-                       long rleNumBytes = bitmapSizeInfo.getRLESize();
-                       long offsetNumBytes = bitmapSizeInfo.getOLESize();
-                       double rleRatio = (double) offsetNumBytes / (double) 
rleNumBytes;
-
-                       if (rleRatio > MIN_RLE_RATIO) {
-                               ColGroupRLE compressedGroup = new 
ColGroupRLE(groupIndices,
-                                               numRows, ubm);
-                               _colGroups.add(compressedGroup);
-                       } 
-                       else {
-                               ColGroupOLE compressedGroup = new ColGroupOLE(
-                                               groupIndices, numRows, ubm);
-                               _colGroups.add(compressedGroup);
-                       }
-
                }
                
                _stats.timePhase3 = time.stop();
@@ -407,6 +351,182 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        LOG.debug("compression phase 4: "+_stats.timePhase4);
        }
 
+       public CompressionStatistics getCompressionStatistics() {
+               return _stats;
+       }
+
+       /**
+        * 
+        * @param estim
+        * @param clen
+        * @return
+        */
+       private static CompressedSizeInfo[] 
computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) {
+               CompressedSizeInfo[] ret = new CompressedSizeInfo[clen];
+               for( int col=0; col<clen; col++ )
+                       ret[col] = estim.estimateCompressedColGroupSize(new 
int[] { col });
+               return ret;
+       }
+       
+       /**
+        * 
+        * @param estim
+        * @param clen
+        * @param k
+        * @return
+        * @throws DMLRuntimeException
+        */
+       private static CompressedSizeInfo[] 
computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen, int k) 
+               throws DMLRuntimeException 
+       {       
+               try {
+                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ArrayList<SizeEstimTask> tasks = new 
ArrayList<SizeEstimTask>();
+                       for( int col=0; col<clen; col++ )
+                               tasks.add(new SizeEstimTask(estim, col));
+                       List<Future<CompressedSizeInfo>> rtask = 
pool.invokeAll(tasks); 
+                       ArrayList<CompressedSizeInfo> ret = new 
ArrayList<CompressedSizeInfo>();
+                       for( Future<CompressedSizeInfo> lrtask : rtask )
+                               ret.add(lrtask.get());
+                       pool.shutdown();
+                       return ret.toArray(new CompressedSizeInfo[0]);
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+       }
+
+       /**
+        * 
+        * @param in
+        * @param estim
+        * @param compRatios
+        * @param rlen
+        * @param groups
+        * @return
+        */
+       private static ColGroup[] compressColGroups(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
List<int[]> groups)
+       {
+               ColGroup[] ret = new ColGroup[groups.size()];
+               for( int i=0; i<groups.size(); i++ )
+                       ret[i] = compressColGroup(in, estim, compRatios, rlen, 
groups.get(i));
+               
+               return ret;
+       }
+       
+       /**
+        * 
+        * @param in
+        * @param estim
+        * @param compRatios
+        * @param rlen
+        * @param groups
+        * @param k
+        * @return
+        * @throws DMLRuntimeException 
+        */
+       private static ColGroup[] compressColGroups(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
List<int[]> groups, int k) 
+               throws DMLRuntimeException
+       {
+               try {
+                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ArrayList<CompressTask> tasks = new 
ArrayList<CompressTask>();
+                       for( int[] colIndexes : groups )
+                               tasks.add(new CompressTask(in, estim, 
compRatios, rlen, colIndexes));
+                       List<Future<ColGroup>> rtask = pool.invokeAll(tasks);   
+                       ArrayList<ColGroup> ret = new ArrayList<ColGroup>();
+                       for( Future<ColGroup> lrtask : rtask )
+                               ret.add(lrtask.get());
+                       pool.shutdown();
+                       return ret.toArray(new ColGroup[0]);
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+       }
+       
+       /**
+        * 
+        * @param in
+        * @param estim
+        * @param compRatios
+        * @param rlen
+        * @param colIndexes
+        * @return
+        */
+       private static ColGroup compressColGroup(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
int[] colIndexes) 
+       {
+               int[] allGroupIndices = null;
+               int allColsCount = colIndexes.length;
+               CompressedSizeInfo sizeInfo;
+               // The compression type is decided based on a full bitmap since 
it
+               // will be reused for the actual compression step.
+               UncompressedBitmap ubm = null;
+               PriorityQueue<CompressedColumn> compRatioPQ = null;
+               boolean skipGroup = false;
+               while (true) 
+               {
+                       //exact big list and observe compression ratio
+                       ubm = BitmapEncoder.extractBitmap(colIndexes, in); 
+                       sizeInfo = estim.estimateCompressedColGroupSize(ubm);   
        
+                       double compRatio = getUncompressedSize(rlen, 
colIndexes.length) / sizeInfo.getMinSize();
+                       
+                       if (compRatio >= MIN_COMPRESSION_RATIO) {
+                               break; // we have a good group
+                       } 
+                       
+                       // modify the group
+                       if (compRatioPQ == null) {
+                               // first modification
+                               allGroupIndices = Arrays.copyOf(colIndexes, 
colIndexes.length);
+                               compRatioPQ = new 
PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
+                               for (int i = 0; i < colIndexes.length; i++)
+                                       compRatioPQ.add(new CompressedColumn(i, 
compRatios.get(colIndexes[i])));
+                       }
+
+                       // index in allGroupIndices
+                       int removeIx = compRatioPQ.poll().colIx;
+                       allGroupIndices[removeIx] = -1;
+                       allColsCount--;
+                       if (allColsCount == 0) {
+                               skipGroup = true;
+                               break;
+                       }
+                       colIndexes = new int[allColsCount];
+                       // copying the values that do not equal -1
+                       int ix = 0;
+                       for (int col : allGroupIndices)
+                               if (col != -1)
+                                       colIndexes[ix++] = col;
+               }
+
+               //add group to uncompressed fallback
+               if( skipGroup )
+                       return null;
+
+               //create compressed column group
+               long rleNumBytes = sizeInfo.getRLESize();
+               long offsetNumBytes = sizeInfo.getOLESize();
+               double rleRatio = (double) offsetNumBytes / (double) 
rleNumBytes;
+               if (rleRatio > MIN_RLE_RATIO)
+                       return new ColGroupRLE(colIndexes, rlen, ubm);
+               else
+                       return new ColGroupOLE(colIndexes, rlen, ubm);
+       }
+       
+       /**
+        * 
+        * @param rlen
+        * @param clen
+        * @return
+        */
+       private static double getUncompressedSize(int rlen, int clen) {
+               // It is correct to use the dense size as the uncompressed size
+               // FIXME not numRows but nnz / col otherwise too aggressive 
overestimation
+               // of uncompressed size and hence overestimation of compression 
potential
+               return 8 * rlen * clen;
+       }
+
        /**
         * @return a new uncompressed matrix block containing the contents of 
this
         *         block
@@ -439,11 +559,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
                return ret;
        }
-
-       public CompressionStatistics getCompressionStatistics(){
-               return _stats;
-       }
-
+       
        /**
         * 
         * @return an upper bound on the memory used to store this compressed 
block
@@ -463,7 +579,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                return total;
        }
 
-       private class CompressedColumn implements Comparable<CompressedColumn> {
+       private static class CompressedColumn implements 
Comparable<CompressedColumn> {
                int colIx;
                double compRatio;
 
@@ -1372,6 +1488,50 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                }
        }
        
+       /**
+        * 
+        */
+       private static class SizeEstimTask implements 
Callable<CompressedSizeInfo> 
+       {
+               private CompressedSizeEstimator _estim = null;
+               private int _col = -1;
+               
+               protected SizeEstimTask( CompressedSizeEstimator estim, int col 
)  {
+                       _estim = estim;
+                       _col = col;
+               }
+               
+               @Override
+               public CompressedSizeInfo call() throws DMLRuntimeException {
+                       return _estim.estimateCompressedColGroupSize(new int[] 
{ _col });
+               }
+       }
+       
+       /**
+        *
+        */
+       private static class CompressTask implements Callable<ColGroup> 
+       {
+               private MatrixBlock _in = null;
+               private CompressedSizeEstimator _estim = null;
+               private HashMap<Integer, Double> _compRatios = null;
+               private int _rlen = -1;
+               private int[] _colIndexes = null;
+               
+               protected CompressTask( MatrixBlock in, CompressedSizeEstimator 
estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes )  {
+                       _in = in;
+                       _estim = estim;
+                       _compRatios = compRatios;
+                       _rlen = rlen;
+                       _colIndexes = colIndexes;
+               }
+               
+               @Override
+               public ColGroup call() throws DMLRuntimeException {
+                       return compressColGroup(_in, _estim, _compRatios, 
_rlen, _colIndexes);
+               }
+       }
+       
        //////////////////////////////////////////
        // Graceful fallback to uncompressed linear algebra
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
index 333bfbb..5230945 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.instructions.cp;
 
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -54,7 +55,7 @@ public class CompressionCPInstruction extends 
UnaryCPInstruction
                
                //compress the matrix block
                CompressedMatrixBlock cmb = new CompressedMatrixBlock(in);
-               cmb.compress();
+               cmb.compress(OptimizerUtils.getConstrainedNumThreads(-1));
                
                //set output and release input
                ec.releaseMatrixInput(input1.getName());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 59663b5..a2e1252 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -185,7 +185,7 @@ public class LibMatrixReorg
                throws DMLRuntimeException
        {
                //redirect small or special cases to sequential execution
-               if( in.isEmptyBlock(false) || (in.rlen * in.clen < 
PAR_NUMCELL_THRESHOLD)
+               if( in.isEmptyBlock(false) || (in.rlen * in.clen < 
PAR_NUMCELL_THRESHOLD) || k == 1
                        || (SHALLOW_DENSE_VECTOR_TRANSPOSE && !in.sparse && 
!out.sparse && (in.rlen==1 || in.clen==1) )
                        || (in.sparse && !out.sparse && in.rlen==1) || 
(!in.sparse && out.sparse && in.rlen==1) 
                        || (!in.sparse && out.sparse) || !out.isThreadSafe())

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
new file mode 100644
index 0000000..e0fe847
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.compress;
+
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class ParCompressionTest extends AutomatedTestBase
+{      
+       private static final int rows = 1023;
+       private static final int cols = 20;
+       private static final double sparsity1 = 0.9;
+       private static final double sparsity2 = 0.1;
+       private static final double sparsity3 = 0.0;
+       
+       public enum SparsityType {
+               DENSE,
+               SPARSE,
+               EMPTY,
+       }
+       
+       public enum ValueType {
+               RAND,
+               RAND_ROUND,
+               CONST,
+       }
+       
+       @Override
+       public void setUp() {
+               
+       }
+       
+       @Test
+       public void testDenseRandDataCompression() {
+               runCompressionTest(SparsityType.DENSE, ValueType.RAND, true);
+       }
+       
+       @Test
+       public void testSparseRandDataCompression() {
+               runCompressionTest(SparsityType.SPARSE, ValueType.RAND, true);
+       }
+       
+       @Test
+       public void testEmptyCompression() {
+               runCompressionTest(SparsityType.EMPTY, ValueType.RAND, true);
+       }
+       
+       @Test
+       public void testDenseRoundRandDataCompression() {
+               runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, 
true);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataCompression() {
+               runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, 
true);
+       }
+       
+       @Test
+       public void testDenseConstantDataCompression() {
+               runCompressionTest(SparsityType.DENSE, ValueType.CONST, true);
+       }
+       
+       @Test
+       public void testSparseConstDataCompression() {
+               runCompressionTest(SparsityType.SPARSE, ValueType.CONST, true);
+       }
+       
+       @Test
+       public void testDenseRandDataNoCompression() {
+               runCompressionTest(SparsityType.DENSE, ValueType.RAND, false);
+       }
+       
+       @Test
+       public void testSparseRandDataNoCompression() {
+               runCompressionTest(SparsityType.SPARSE, ValueType.RAND, false);
+       }
+       
+       @Test
+       public void testEmptyNoCompression() {
+               runCompressionTest(SparsityType.EMPTY, ValueType.RAND, false);
+       }
+       
+       @Test
+       public void testDenseRoundRandDataNoCompression() {
+               runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, 
false);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataNoCompression() {
+               runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, 
false);
+       }
+       
+       @Test
+       public void testDenseConstDataNoCompression() {
+               runCompressionTest(SparsityType.DENSE, ValueType.CONST, false);
+       }
+       
+       @Test
+       public void testSparseConstDataNoCompression() {
+               runCompressionTest(SparsityType.SPARSE, ValueType.CONST, false);
+       }
+       
+
+       /**
+        * 
+        * @param mb
+        */
+       private void runCompressionTest(SparsityType sptype, ValueType vtype, 
boolean compress)
+       {
+               try
+               {
+                       //prepare sparsity for input data
+                       double sparsity = -1;
+                       switch( sptype ){
+                               case DENSE: sparsity = sparsity1; break;
+                               case SPARSE: sparsity = sparsity2; break;
+                               case EMPTY: sparsity = sparsity3; break;
+                       }
+                       
+                       //generate input data
+                       double min = (vtype==ValueType.CONST)? 10 : -10;
+                       double[][] input = TestUtils.generateTestMatrix(rows, 
cols, min, 10, sparsity, 7);
+                       if( vtype==ValueType.RAND_ROUND )
+                               input = TestUtils.round(input);
+                       MatrixBlock mb = 
DataConverter.convertToMatrixBlock(input);
+                       
+                       //compress given matrix block
+                       CompressedMatrixBlock cmb = new 
CompressedMatrixBlock(mb);
+                       if( compress )
+                               
cmb.compress(InfrastructureAnalyzer.getLocalParallelism());
+                       
+                       //decompress the compressed matrix block
+                       MatrixBlock tmp = cmb.decompress();
+                       
+                       //compare result with input
+                       double[][] d1 = DataConverter.convertToDoubleMatrix(mb);
+                       double[][] d2 = 
DataConverter.convertToDoubleMatrix(tmp);
+                       TestUtils.compareMatrices(d1, d2, rows, cols, 0);
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
index c8dc906..7d19ae9 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
@@ -43,6 +43,7 @@ import org.junit.runners.Suite;
        LargeMatrixVectorMultTest.class,
        LargeParMatrixVectorMultTest.class,
        LargeVectorMatrixMultTest.class,
+       ParCompressionTest.class,
        ParMatrixMultChainTest.class,
        ParMatrixVectorMultTest.class,
        ParTransposeSelfLeftMatrixMultTest.class,

Reply via email to