http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
new file mode 100644
index 0000000..39bc162
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+
+/** A group of columns compressed with a single run-length encoded bitmap. */
+public class ColGroupRLE extends ColGroupBitmap 
+{
+       private static final long serialVersionUID = 7450232907594748177L;
+
+       public ColGroupRLE() {
+               super(CompressionType.RLE_BITMAP);
+       }
+       
+       /**
+        * Main constructor. Constructs and stores the necessary bitmaps.
+        * 
+        * @param colIndices
+        *            indices (within the block) of the columns included in this
+        *            column
+        * @param numRows
+        *            total number of rows in the parent block
+        * @param ubm
+        *            Uncompressed bitmap representation of the block
+        */
+       public ColGroupRLE(int[] colIndices, int numRows, UncompressedBitmap 
ubm) 
+       {
+               super(CompressionType.RLE_BITMAP, colIndices, numRows, ubm);
+               
+               // compress the bitmaps
+               final int numVals = ubm.getNumValues();
+               char[][] lbitmaps = new char[numVals][];
+               int totalLen = 0;
+               for( int i=0; i<numVals; i++ ) {
+                       lbitmaps[i] = 
BitmapEncoder.genRLEBitmap(ubm.getOffsetsList(i));
+                       totalLen += lbitmaps[i].length;
+               }
+               
+               // compact bitmaps to linearized representation
+               createCompressedBitmaps(numVals, totalLen, lbitmaps);
+       }
+
+       /**
+        * Constructor for internal use.
+        */
+       public ColGroupRLE(int[] colIndices, int numRows, double[] values, 
char[] bitmaps, int[] bitmapOffs) {
+               super(CompressionType.RLE_BITMAP, colIndices, numRows, values);
+               _data = bitmaps;
+               _ptr = bitmapOffs;
+       }
+
+       @Override
+       public Iterator<Integer> getDecodeIterator(int bmpIx) {
+               return new BitmapDecoderRLE(_data, _ptr[bmpIx], len(bmpIx)); 
+       }
+       
+       @Override
+       public void decompressToBlock(MatrixBlock target) 
+       {
+               if( LOW_LEVEL_OPT && getNumValues() > 1 )
+               {
+                       final int blksz = 128 * 1024;
+                       final int numCols = getNumCols();
+                       final int numVals = getNumValues();
+                       final int n = getNumRows();
+                       
+                       //position and start offset arrays
+                       int[] apos = new int[numVals];
+                       int[] astart = new int[numVals];
+                       
+                       //cache conscious append via horizontal scans 
+                       for( int bi=0; bi<n; bi+=blksz ) {
+                               int bimax = Math.min(bi+blksz, n);              
                        
+                               for (int k=0, off=0; k < numVals; k++, 
off+=numCols) {
+                                       int bitmapOff = _ptr[k];
+                                       int bitmapLen = len(k);
+                                       int bufIx = apos[k];
+                                       int start = astart[k];
+                                       for( ; bufIx<bitmapLen & start<bimax; 
bufIx+=2) {
+                                               start += _data[bitmapOff + 
bufIx];
+                                               int len = _data[bitmapOff + 
bufIx+1];
+                                               for( int i=start; i<start+len; 
i++ )
+                                                       for( int j=0; 
j<numCols; j++ )
+                                                               if( 
_values[off+j]!=0 )
+                                                                       
target.appendValue(i, _colIndexes[j], _values[off+j]);
+                                               start += len;
+                                       }
+                                       apos[k] = bufIx;        
+                                       astart[k] = start;
+                               }
+                       }
+               }
+               else
+               {
+                       //call generic decompression with decoder
+                       super.decompressToBlock(target);
+               }
+       }
+
+       @Override
+       public void decompressToBlock(MatrixBlock target, int[] colixTargets) 
+       {
+               if( LOW_LEVEL_OPT && getNumValues() > 1 )
+               {
+                       final int blksz = 128 * 1024;
+                       final int numCols = getNumCols();
+                       final int numVals = getNumValues();
+                       final int n = getNumRows();
+                       
+                       //position and start offset arrays
+                       int[] apos = new int[numVals];
+                       int[] astart = new int[numVals];
+                       int[] cix = new int[numCols];
+                       
+                       //prepare target col indexes
+                       for( int j=0; j<numCols; j++ )
+                               cix[j] = colixTargets[_colIndexes[j]];
+                       
+                       //cache conscious append via horizontal scans 
+                       for( int bi=0; bi<n; bi+=blksz ) {
+                               int bimax = Math.min(bi+blksz, n);              
                        
+                               for (int k=0, off=0; k < numVals; k++, 
off+=numCols) {
+                                       int bitmapOff = _ptr[k];
+                                       int bitmapLen = len(k);
+                                       int bufIx = apos[k];
+                                       if( bufIx >= bitmapLen ) 
+                                               continue;
+                                       int start = astart[k];
+                                       for( ; bufIx<bitmapLen & start<bimax; 
bufIx+=2) {
+                                               start += _data[bitmapOff + 
bufIx];
+                                               int len = _data[bitmapOff + 
bufIx+1];
+                                               for( int i=start; i<start+len; 
i++ )
+                                                       for( int j=0; 
j<numCols; j++ )
+                                                               if( 
_values[off+j]!=0 )
+                                                                       
target.appendValue(i, cix[j], _values[off+j]);
+                                               start += len;
+                                       }
+                                       apos[k] = bufIx;        
+                                       astart[k] = start;
+                               }
+                       }
+               }
+               else
+               {
+                       //call generic decompression with decoder
+                       super.decompressToBlock(target, colixTargets);
+               }
+       }
+
+       @Override
+       public void decompressToBlock(MatrixBlock target, int colpos) 
+       {
+               if( LOW_LEVEL_OPT && getNumValues() > 1 )
+               {
+                       final int blksz = 128 * 1024;
+                       final int numCols = getNumCols();
+                       final int numVals = getNumValues();
+                       final int n = getNumRows();
+                       double[] c = target.getDenseBlock();
+                       
+                       //position and start offset arrays
+                       int[] apos = new int[numVals];
+                       int[] astart = new int[numVals];
+                       
+                       //cache conscious append via horizontal scans 
+                       for( int bi=0; bi<n; bi+=blksz ) {
+                               int bimax = Math.min(bi+blksz, n);              
                        
+                               for (int k=0, off=0; k < numVals; k++, 
off+=numCols) {
+                                       int bitmapOff = _ptr[k];
+                                       int bitmapLen = len(k);
+                                       int bufIx = apos[k];
+                                       if( bufIx >= bitmapLen ) 
+                                               continue;
+                                       int start = astart[k];
+                                       for( ; bufIx<bitmapLen & start<bimax; 
bufIx+=2) {
+                                               start += _data[bitmapOff + 
bufIx];
+                                               int len = _data[bitmapOff + 
bufIx+1];
+                                               for( int i=start; i<start+len; 
i++ )
+                                                       c[i] = 
_values[off+colpos];
+                                               start += len;
+                                       }
+                                       apos[k] = bufIx;        
+                                       astart[k] = start;
+                               }
+                       }
+                       
+                       target.recomputeNonZeros();
+               }
+               else
+               {
+                       //call generic decompression with decoder
+                       super.decompressToBlock(target, colpos);
+               }
+       }
+       
+       @Override
+       public void rightMultByVector(MatrixBlock vector, MatrixBlock result, 
int rl, int ru)
+                       throws DMLRuntimeException 
+       {
+               double[] b = ConverterUtils.getDenseVector(vector);
+               double[] c = result.getDenseBlock();
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               
+               //prepare reduced rhs w/ relevant values
+               double[] sb = new double[numCols];
+               for (int j = 0; j < numCols; j++) {
+                       sb[j] = b[_colIndexes[j]];
+               }
+               
+               if( LOW_LEVEL_OPT && numVals > 1 
+                       && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ )
+               {
+                       //L3 cache alignment, see comment rightMultByVector OLE 
column group 
+                       //core difference of RLE to OLE is that runs are not 
segment alignment,
+                       //which requires care of handling runs crossing 
cache-buckets
+                       final int blksz = ColGroupBitmap.WRITE_CACHE_BLKSZ; 
+                       
+                       //step 1: prepare position and value arrays
+                       
+                       //current pos / values per RLE list
+                       int[] apos = new int[numVals];
+                       int[] astart = new int[numVals];
+                       double[] aval = new double[numVals];
+                       
+                       //skip-scan to beginning for all OLs 
+                       if( rl > 0 ) { //rl aligned with blksz  
+                               for (int k = 0; k < numVals; k++) {
+                                       int boff = _ptr[k];
+                                       int blen = len(k);
+                                       int bix = 0;
+                                       int start = 0;
+                                       while( bix<blen ) {     
+                                               int lstart = _data[boff + bix]; 
//start
+                                               int llen = _data[boff + bix + 
1]; //len
+                                               if( start+lstart+llen >= rl )
+                                                       break;
+                                               start += lstart + llen;
+                                               bix += 2;
+                                       }
+                                       apos[k] = bix;
+                                       astart[k] = start;
+                               }
+                       }
+                       
+                       //pre-aggregate values per OLs
+                       for( int k = 0; k < numVals; k++ )
+                               aval[k] = sumValues(k, sb);
+                                       
+                       //step 2: cache conscious matrix-vector via horizontal 
scans 
+                       for( int bi=rl; bi<ru; bi+=blksz ) 
+                       {
+                               int bimax = Math.min(bi+blksz, ru);
+                                       
+                               //horizontal segment scan, incl pos maintenance
+                               for (int k = 0; k < numVals; k++) {
+                                       int boff = _ptr[k];
+                                       int blen = len(k);
+                                       double val = aval[k];
+                                       int bix = apos[k];
+                                       int start = astart[k];
+                                       
+                                       //compute partial results, not aligned
+                                       while( bix<blen ) {
+                                               int lstart = _data[boff + bix];
+                                               int llen = _data[boff + bix + 
1];
+                                               LinearAlgebraUtils.vectAdd(val, 
c, Math.max(bi, start+lstart), 
+                                                               
Math.min(start+lstart+llen,bimax) - Math.max(bi, start+lstart));
+                                               if(start+lstart+llen >= bimax)
+                                                       break;
+                                               start += lstart + llen;
+                                               bix += 2;
+                                       }
+                                       
+                                       apos[k] = bix;  
+                                       astart[k] = start;
+                               }
+                       }
+               }
+               else
+               {
+                       for (int k = 0; k < numVals; k++) {
+                               int boff = _ptr[k];
+                               int blen = len(k);
+                               double val = sumValues(k, sb);
+                               int bix = 0;
+                               int start = 0;
+                               
+                               //scan to beginning offset if necessary 
+                               if( rl > 0 ) { //rl aligned with blksz  
+                                       while( bix<blen ) {     
+                                               int lstart = _data[boff + bix]; 
//start
+                                               int llen = _data[boff + bix + 
1]; //len
+                                               if( start+lstart+llen >= rl )
+                                                       break;
+                                               start += lstart + llen;
+                                               bix += 2;
+                                       }
+                               }
+                               
+                               //compute partial results, not aligned
+                               while( bix<blen ) {
+                                       int lstart = _data[boff + bix];
+                                       int llen = _data[boff + bix + 1];
+                                       LinearAlgebraUtils.vectAdd(val, c, 
Math.max(rl, start+lstart), 
+                                                       
Math.min(start+lstart+llen,ru) - Math.max(rl, start+lstart));
+                                       if(start+lstart+llen >= ru)
+                                               break;
+                                       start += lstart + llen;
+                                       bix += 2;
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
+                       throws DMLRuntimeException 
+       {               
+               double[] a = ConverterUtils.getDenseVector(vector);
+               double[] c = result.getDenseBlock();
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               final int n = getNumRows();
+               
+               if( LOW_LEVEL_OPT && numVals > 1 
+                       && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) 
+               {
+                       final int blksz = ColGroupBitmap.READ_CACHE_BLKSZ; 
+                       
+                       //step 1: prepare position and value arrays
+                       
+                       //current pos per OLs / output values
+                       int[] apos = new int[numVals];
+                       int[] astart = new int[numVals];
+                       double[] cvals = new double[numVals];
+                       
+                       //step 2: cache conscious matrix-vector via horizontal 
scans 
+                       for( int ai=0; ai<n; ai+=blksz ) 
+                       {
+                               int aimax = Math.min(ai+blksz, n);
+                               
+                               //horizontal scan, incl pos maintenance
+                               for (int k = 0; k < numVals; k++) {
+                                       int bitmapOff = _ptr[k];
+                                       int bitmapLen = len(k);                 
                        
+                                       int bufIx = apos[k];
+                                       int start = astart[k];
+                                       
+                                       //compute partial results, not aligned
+                                       while( bufIx<bitmapLen & start<aimax ) {
+                                               start += _data[bitmapOff + 
bufIx];
+                                               int len = _data[bitmapOff + 
bufIx+1];
+                                               cvals[k] += 
LinearAlgebraUtils.vectSum(a, start, len);
+                                               start += len;
+                                               bufIx += 2;
+                                       }
+                                       
+                                       apos[k] = bufIx;        
+                                       astart[k] = start;
+                               }
+                       }
+                       
+                       //step 3: scale partial results by values and write to 
global output
+                       for (int k = 0, valOff=0; k < numVals; k++, 
valOff+=numCols)
+                               for( int j = 0; j < numCols; j++ )
+                                       c[ _colIndexes[j] ] += cvals[k] * 
_values[valOff+j];
+                       
+               }
+               else
+               {
+                       //iterate over all values and their bitmaps
+                       for (int bitmapIx=0, valOff=0; bitmapIx<numVals; 
bitmapIx++, valOff+=numCols) 
+                       {       
+                               int bitmapOff = _ptr[bitmapIx];
+                               int bitmapLen = len(bitmapIx);
+                               
+                               double vsum = 0;
+                               int curRunEnd = 0;
+                               for (int bufIx = 0; bufIx < bitmapLen; bufIx += 
2) {
+                                       int curRunStartOff = curRunEnd + 
_data[bitmapOff+bufIx];
+                                       int curRunLen = _data[bitmapOff+bufIx + 
1];
+                                       vsum += LinearAlgebraUtils.vectSum(a, 
curRunStartOff, curRunLen);
+                                       curRunEnd = curRunStartOff + curRunLen;
+                               }
+                               
+                               //scale partial results by values and write 
results
+                               for( int j = 0; j < numCols; j++ )
+                                       c[ _colIndexes[j] ] += vsum * _values[ 
valOff+j ];
+                       }
+               }
+       }
+
+       @Override
+       public ColGroup scalarOperation(ScalarOperator op)
+                       throws DMLRuntimeException 
+       {
+               double val0 = op.executeScalar(0);
+               
+               //fast path: sparse-safe operations
+               // Note that bitmaps don't change and are shallow-copied
+               if( op.sparseSafe || val0==0 ) {
+                       return new ColGroupRLE(_colIndexes, _numRows, 
+                                       applyScalarOp(op), _data, _ptr);
+               }
+               
+               //slow path: sparse-unsafe operations (potentially create new 
bitmap)
+               //note: for efficiency, we currently don't drop values that 
become 0
+               boolean[] lind = computeZeroIndicatorVector();
+               int[] loff = computeOffsets(lind);
+               if( loff.length==0 ) { //empty offset list: go back to fast path
+                       return new ColGroupRLE(_colIndexes, _numRows, 
+                                       applyScalarOp(op), _data, _ptr);
+               }
+               
+               double[] rvalues = applyScalarOp(op, val0, getNumCols());       
        
+               char[] lbitmap = BitmapEncoder.genRLEBitmap(loff);
+               char[] rbitmaps = Arrays.copyOf(_data, 
_data.length+lbitmap.length);
+               System.arraycopy(lbitmap, 0, rbitmaps, _data.length, 
lbitmap.length);
+               int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1);
+               rbitmapOffs[rbitmapOffs.length-1] = rbitmaps.length; 
+               
+               return new ColGroupRLE(_colIndexes, _numRows, 
+                               rvalues, rbitmaps, rbitmapOffs);
+       }
+       
+       @Override
+       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock result) 
+               throws DMLRuntimeException 
+       {
+               KahanFunction kplus = (op.aggOp.increOp.fn instanceof 
KahanPlus) ?
+                               KahanPlus.getKahanPlusFnObject() : 
KahanPlusSq.getKahanPlusSqFnObject();
+               
+               if( op.indexFn instanceof ReduceAll )
+                       computeSum(result, kplus);
+               else if( op.indexFn instanceof ReduceCol )
+                       computeRowSums(result, kplus);
+               else if( op.indexFn instanceof ReduceRow )
+                       computeColSums(result, kplus);
+       }
+       
+       /**
+        * 
+        * @param result
+        */
+       private void computeSum(MatrixBlock result, KahanFunction kplus)
+       {
+               KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), 
result.quickGetValue(0, 1));
+               
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               
+               for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+                       int bitmapOff = _ptr[bitmapIx];
+                       int bitmapLen = len(bitmapIx);
+                       int valOff = bitmapIx * numCols;
+                       int curRunEnd = 0;
+                       int count = 0;
+                       for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+                               int curRunStartOff = curRunEnd + 
_data[bitmapOff+bufIx];
+                               curRunEnd = curRunStartOff + 
_data[bitmapOff+bufIx + 1];
+                               count += curRunEnd-curRunStartOff;
+                       }
+                       
+                       //scale counts by all values
+                       for( int j = 0; j < numCols; j++ )
+                               kplus.execute3(kbuff, _values[ valOff+j ], 
count);
+               }
+               
+               result.quickSetValue(0, 0, kbuff._sum);
+               result.quickSetValue(0, 1, kbuff._correction);
+       }
+       
+       /**
+        * 
+        * @param result
+        */
+       private void computeRowSums(MatrixBlock result, KahanFunction kplus)
+       {
+               KahanObject kbuff = new KahanObject(0, 0);
+               final int numVals = getNumValues();
+               
+               for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+                       int bitmapOff = _ptr[bitmapIx];
+                       int bitmapLen = len(bitmapIx);
+                       double val = sumValues(bitmapIx);
+                                       
+                       if (val != 0.0) {
+                               int curRunStartOff = 0;
+                               int curRunEnd = 0;
+                               for (int bufIx = 0; bufIx < bitmapLen; bufIx += 
2) {
+                                       curRunStartOff = curRunEnd + 
_data[bitmapOff+bufIx];
+                                       curRunEnd = curRunStartOff + 
_data[bitmapOff+bufIx + 1];
+                                       for (int rix = curRunStartOff; rix < 
curRunEnd; rix++) {
+                                               
kbuff.set(result.quickGetValue(rix, 0), result.quickGetValue(rix, 1));
+                                               kplus.execute2(kbuff, val);
+                                               result.quickSetValue(rix, 0, 
kbuff._sum);
+                                               result.quickSetValue(rix, 1, 
kbuff._correction);
+                                       }
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * 
+        * @param result
+        */
+       private void computeColSums(MatrixBlock result, KahanFunction kplus)
+       {
+               KahanObject kbuff = new KahanObject(0, 0);
+               
+               final int numCols = getNumCols();
+               final int numVals = getNumValues();
+               
+               for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+                       int bitmapOff = _ptr[bitmapIx];
+                       int bitmapLen = len(bitmapIx);
+                       int valOff = bitmapIx * numCols;
+                       int curRunEnd = 0;
+                       int count = 0;
+                       for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+                               int curRunStartOff = curRunEnd + 
_data[bitmapOff+bufIx];
+                               curRunEnd = curRunStartOff + 
_data[bitmapOff+bufIx + 1];
+                               count += curRunEnd-curRunStartOff;
+                       }
+                       
+                       //scale counts by all values
+                       for( int j = 0; j < numCols; j++ ) {
+                               kbuff.set(result.quickGetValue(0, 
_colIndexes[j]),result.quickGetValue(1, _colIndexes[j]));
+                               kplus.execute3(kbuff, _values[ valOff+j ], 
count);
+                               result.quickSetValue(0, _colIndexes[j], 
kbuff._sum);
+                               result.quickSetValue(1, _colIndexes[j], 
kbuff._correction);
+                       }
+               }
+       }
+       
+       public boolean[] computeZeroIndicatorVector()
+               throws DMLRuntimeException 
+       {       
+               boolean[] ret = new boolean[_numRows];
+               final int numVals = getNumValues();
+
+               //initialize everything with zero
+               Arrays.fill(ret, true);
+               
+               for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+                       int bitmapOff = _ptr[bitmapIx];
+                       int bitmapLen = len(bitmapIx);
+                       
+                       int curRunStartOff = 0;
+                       int curRunEnd = 0;
+                       for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+                               curRunStartOff = curRunEnd + 
_data[bitmapOff+bufIx];
+                               curRunEnd = curRunStartOff + 
_data[bitmapOff+bufIx + 1];
+                               Arrays.fill(ret, curRunStartOff, curRunEnd, 
false);
+                       }
+               }
+               
+               return ret;
+       }
+       
+       @Override
+       protected void countNonZerosPerRow(int[] rnnz)
+       {
+               final int numVals = getNumValues();
+               final int numCols = getNumCols();
+               
+               for (int k = 0; k < numVals; k++) {
+                       int bitmapOff = _ptr[k];
+                       int bitmapLen = len(k);
+                       
+                       int curRunStartOff = 0;
+                       int curRunEnd = 0;
+                       for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+                               curRunStartOff = curRunEnd + 
_data[bitmapOff+bufIx];
+                               curRunEnd = curRunStartOff + 
_data[bitmapOff+bufIx + 1];
+                               for( int i=curRunStartOff; i<curRunEnd; i++ )
+                                       rnnz[i] += numCols;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
new file mode 100644
index 0000000..d9b75a1
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
+import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysml.runtime.util.SortUtils;
+
+
+/**
+ * Column group type for columns that are stored as dense arrays of doubles.
+ * Uses a MatrixBlock internally to store the column contents.
+ * 
+ */
+public class ColGroupUncompressed extends ColGroup 
+{
+       private static final long serialVersionUID = 4870546053280378891L;
+
+       /**
+        * We store the contents of the columns as a MatrixBlock to take 
advantage
+        * of high-performance routines available for this data structure.
+        */
+       private MatrixBlock _data;
+
+       public ColGroupUncompressed() {
+               super(CompressionType.UNCOMPRESSED, (int[])null, -1);
+       }
+       
+       /**
+        * Main constructor.
+        * 
+        * @param colIndicesList
+        *            indices (relative to the current block) of the columns 
that
+        *            this column group represents.
+        * @param rawblock
+        *            the uncompressed block; uncompressed data must be present 
at
+        *            the time that the constructor is called
+        * @throws DMLRuntimeException
+        */
+       @SuppressWarnings("unused")
+       public ColGroupUncompressed(List<Integer> colIndicesList, MatrixBlock 
rawblock) 
+               throws DMLRuntimeException 
+       {
+               super(CompressionType.UNCOMPRESSED, colIndicesList, 
+                               CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+                               rawblock.getNumColumns() : 
rawblock.getNumRows());
+
+               //prepare meta data
+               int numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+                               rawblock.getNumColumns() : 
rawblock.getNumRows();
+               
+               // Create a matrix with just the requested rows of the original 
block
+               _data = new MatrixBlock(numRows,
+                               _colIndexes.length, 
rawblock.isInSparseFormat());
+
+               //ensure sorted col indices
+               if(!SortUtils.isSorted(0, _colIndexes.length, _colIndexes))
+                       Arrays.sort(_colIndexes);
+               
+               //special cases empty blocks
+               if (rawblock.isEmptyBlock(false))
+                       return;
+               //special cases full block
+               if( !CompressedMatrixBlock.TRANSPOSE_INPUT &&
+                       _data.getNumColumns() == rawblock.getNumColumns() ){
+                       _data.copy(rawblock);
+                       return;
+               }
+               
+               // dense implementation for dense and sparse matrices to avoid 
linear search
+               int m = numRows;
+               int n = _colIndexes.length;
+               for( int i = 0; i < m; i++) {
+                       for( int j = 0; j < n; j++ ) {
+                               double val = 
CompressedMatrixBlock.TRANSPOSE_INPUT ?
+                                               
rawblock.quickGetValue(_colIndexes[j], i) :
+                                               rawblock.quickGetValue(i, 
_colIndexes[j]);      
+                               _data.appendValue(i, j, val);
+                       }
+               }
+               _data.examSparsity();
+       }
+
+       /**
+        * Constructor for creating temporary decompressed versions of one or 
more
+        * compressed column groups.
+        * 
+        * @param groupsToDecompress
+        *            compressed columns to subsume. Must contain at least one
+        *            element.
+        */
+       public ColGroupUncompressed(ArrayList<ColGroup> groupsToDecompress) 
+       {
+               super(CompressionType.UNCOMPRESSED, 
+                               mergeColIndices(groupsToDecompress),
+                               groupsToDecompress.get(0)._numRows);
+
+               // Invert the list of column indices
+               int maxColIndex = _colIndexes[_colIndexes.length - 1];
+               int[] colIndicesInverted = new int[maxColIndex + 1];
+               for (int i = 0; i < _colIndexes.length; i++) {
+                       colIndicesInverted[_colIndexes[i]] = i;
+               }
+
+               // Create the buffer that holds the uncompressed data, packed 
together
+               _data = new MatrixBlock(_numRows, _colIndexes.length, false);
+
+               for (ColGroup colGroup : groupsToDecompress) {
+                       colGroup.decompressToBlock(_data, colIndicesInverted);
+               }
+       }
+
+       /**
+        * Constructor for internal use. Used when a method needs to build an
+        * instance of this class from scratch.
+        * 
+        * @param colIndices
+        *            column mapping for this column group
+        * @param numRows
+        *            number of rows in the column, for passing to the 
superclass
+        * @param colContents
+        *            uncompressed cell values
+        */
+       public ColGroupUncompressed(int[] colIndices, int numRows, MatrixBlock 
data) 
+       {
+               super(CompressionType.UNCOMPRESSED, colIndices, numRows);
+               _data = data;
+       }
+
+
+       /**
+        * Access for superclass
+        * 
+        * @return direct pointer to the internal representation of the columns
+        */
+       public MatrixBlock getData() {
+               return _data;
+       }
+       
+       /**
+        * Subroutine of constructor.
+        * 
+        * @param groupsToDecompress
+        *            input to the constructor that decompresses into a 
temporary
+        *            UncompressedColGroup
+        * @return a merged set of column indices across all those groups
+        */
+       private static int[] mergeColIndices(ArrayList<ColGroup> 
groupsToDecompress) 
+       {
+               // Pass 1: Determine number of columns
+               int sz = 0;
+               for (ColGroup colGroup : groupsToDecompress) {
+                       sz += colGroup.getNumCols();
+               }
+
+               // Pass 2: Copy column offsets out
+               int[] ret = new int[sz];
+               int pos = 0;
+               for (ColGroup colGroup : groupsToDecompress) {
+                       int[] tmp = colGroup.getColIndices();
+                       System.arraycopy(tmp, 0, ret, pos, tmp.length);
+                       pos += tmp.length;
+               }
+
+               // Pass 3: Sort and return the list of columns
+               Arrays.sort(ret);
+               return ret;
+       }
+
+       @Override
+       public long estimateInMemorySize() {
+               long size = super.estimateInMemorySize();
+               // adding the size of colContents
+               return size + 8 + _data.estimateSizeInMemory();
+       }
+
+       @Override
+       public void decompressToBlock(MatrixBlock target) {
+               //empty block, nothing to add to output
+               if( _data.isEmptyBlock(false) )
+                       return;         
+               for (int row = 0; row < _data.getNumRows(); row++) {
+                       for (int colIx = 0; colIx < _colIndexes.length; 
colIx++) {
+                               int col = _colIndexes[colIx];
+                               double cellVal = _data.quickGetValue(row, 
colIx);
+                               target.quickSetValue(row, col, cellVal);
+                       }
+               }
+       }
+
+       @Override
+       public void decompressToBlock(MatrixBlock target, int[] 
colIndexTargets) {
+               //empty block, nothing to add to output
+               if( _data.isEmptyBlock(false) ) {
+                       return;         
+               }
+               // Run through the rows, putting values into the appropriate 
locations
+               for (int row = 0; row < _data.getNumRows(); row++) {
+                       for (int colIx = 0; colIx < _data.getNumColumns(); 
colIx++) {
+                               int origMatrixColIx = getColIndex(colIx);
+                               int col = colIndexTargets[origMatrixColIx];
+                               double cellVal = _data.quickGetValue(row, 
colIx);
+                               target.quickSetValue(row, col, cellVal);
+                       }
+               }       
+       }
+       
+       @Override
+       public void decompressToBlock(MatrixBlock target, int colpos) {
+               //empty block, nothing to add to output
+               if( _data.isEmptyBlock(false) ) {
+                       return;         
+               }
+               // Run through the rows, putting values into the appropriate 
locations
+               for (int row = 0; row < _data.getNumRows(); row++) {
+                       double cellVal = _data.quickGetValue(row, colpos);
+                       target.quickSetValue(row, 0, cellVal);
+               }       
+       }
+
+       @Override
+       public void rightMultByVector(MatrixBlock vector, MatrixBlock result, 
int rl, int ru)
+                       throws DMLRuntimeException 
+       {
+               // Pull out the relevant rows of the vector
+               int clen = _colIndexes.length;
+               
+               MatrixBlock shortVector = new MatrixBlock(clen, 1, false);
+               shortVector.allocateDenseBlock();
+               double[] b = shortVector.getDenseBlock();
+               for (int colIx = 0; colIx < clen; colIx++)
+                       b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0);
+               shortVector.recomputeNonZeros();
+               
+               // Multiply the selected columns by the appropriate parts of 
the vector
+               LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru);   
+       }
+       
+       @Override
+       public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
+                       throws DMLRuntimeException 
+       {
+               MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, 
false);
+               LibMatrixMult.matrixMult(vector, _data, pret);
+               
+               // copying partialResult to the proper indices of the result
+               if( !pret.isEmptyBlock(false) ) {
+                       double[] rsltArr = result.getDenseBlock();
+                       for (int colIx = 0; colIx < _colIndexes.length; colIx++)
+                               rsltArr[_colIndexes[colIx]] = 
pret.quickGetValue(0, colIx);
+                       result.recomputeNonZeros();
+               }
+       }
+       
+       public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, 
int k)
+                       throws DMLRuntimeException 
+       {
+               MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, 
false);
+               LibMatrixMult.matrixMult(vector, _data, pret, k);
+               
+               // copying partialResult to the proper indices of the result
+               if( !pret.isEmptyBlock(false) ) {
+                       double[] rsltArr = result.getDenseBlock();
+                       for (int colIx = 0; colIx < _colIndexes.length; colIx++)
+                               rsltArr[_colIndexes[colIx]] = 
pret.quickGetValue(0, colIx);
+                       result.recomputeNonZeros();
+               }
+       }
+
+       @Override
+       public ColGroup scalarOperation(ScalarOperator op)
+                       throws DMLRuntimeException 
+       {
+               //execute scalar operations
+               MatrixBlock retContent = (MatrixBlock) _data
+                               .scalarOperations(op, new MatrixBlock());
+
+               //construct new uncompressed column group
+               return new ColGroupUncompressed(getColIndices(), 
_data.getNumRows(), retContent);
+       }
+       
+       @Override
+       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock ret)
+               throws DMLRuntimeException 
+       {
+               //execute unary aggregate operations
+               LibMatrixAgg.aggregateUnaryMatrix(_data, ret, op);
+       }
+
+       @Override
+       public void readFields(DataInput in)
+               throws IOException 
+       {
+               //read col contents (w/ meta data)
+               _data = new MatrixBlock();
+               _data.readFields(in);           
+               _numRows = _data.getNumRows();
+               
+               //read col indices
+               int numCols = _data.getNumColumns();
+               _colIndexes = new int[ numCols ];
+               for( int i=0; i<numCols; i++ )
+                       _colIndexes[i] = in.readInt();
+       }
+
+       @Override
+       public void write(DataOutput out) 
+               throws IOException 
+       {
+               //write col contents first (w/ meta data)
+               _data.write(out);
+               
+               //write col indices
+               int len = _data.getNumColumns();
+               for( int i=0; i<len; i++ )
+                       out.writeInt( _colIndexes[i] );
+       }
+
+       @Override
+       public long getExactSizeOnDisk() {
+               return _data.getExactSizeOnDisk()
+                          + 4 * _data.getNumColumns();
+       }
+       
+       @Override
+       protected void countNonZerosPerRow(int[] rnnz)
+       {
+               for( int i=0; i<_data.getNumRows(); i++ )
+                       rnnz[i] += _data.recomputeNonZeros(i, i, 0, 
_data.getNumColumns()-1);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
new file mode 100644
index 0000000..6ac26c6
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -0,0 +1,1342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.MMTSJ.MMTSJType;
+import org.apache.sysml.lops.MapMultChain.ChainType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.ColGroup.CompressionType;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.Multiply;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
+import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.MatrixValue;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Experimental version of MatrixBlock that allows a compressed internal
+ * representation.
+ */
+public class CompressedMatrixBlock extends MatrixBlock implements 
Externalizable
+{
+       private static final long serialVersionUID = 7319972089143154057L;
+       
+       //internal configuration
+       public static final int MAX_NUMBER_COCODING_COLUMNS = 1000;
+       public static final double MIN_COMPRESSION_RATIO = 2.0;
+       public static final double MIN_RLE_RATIO = 1.0; // Minimum additional 
compression (non-RLE size / RLE size) before we switch to run-length encoding.
+       public static final boolean TRANSPOSE_INPUT = true;
+       public static final boolean MATERIALIZE_ZEROS = false;
+       public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
+       public static final boolean INVESTIGATE_ESTIMATES = false;
+       private static final boolean LDEBUG = false; //local debug flag
+       
+       private static final Log LOG = 
LogFactory.getLog(CompressedMatrixBlock.class.getName());
+       
+       static{
+               // for internal debugging only
+               if( LDEBUG ) {
+                       Logger.getLogger("org.apache.sysml.runtime.compress")
+                                 .setLevel((Level) Level.DEBUG);
+               }       
+       }
+       
+       protected ArrayList<ColGroup> _colGroups = null;
+       protected CompressionStatistics _stats = null;
+       
+       public CompressedMatrixBlock() {
+               super(-1, -1, true);
+       }
+       
+       /**
+        * Main constructor for building a block from scratch.
+        * 
+        * @param rl
+        *            number of rows in the block
+        * @param cl
+        *            number of columns
+        * @param sparse
+        *            true if the UNCOMPRESSED representation of the block 
should be
+        *            sparse
+        */
+       public CompressedMatrixBlock(int rl, int cl, boolean sparse) {
+               super(rl, cl, sparse);
+       }
+
+       /**
+        * "Copy" constructor to populate this compressed block with the
+        * uncompressed contents of a conventional block. Does <b>not</b> 
compress
+        * the block.
+        */
+       public CompressedMatrixBlock(MatrixBlock mb) {
+               super(mb.getNumRows(), mb.getNumColumns(), 
mb.isInSparseFormat());
+               
+               //shallow copy (deep copy on compression, prevents unnecessary 
copy) 
+               if( isInSparseFormat() )
+                       sparseBlock = mb.getSparseBlock();
+               else
+                       denseBlock = mb.getDenseBlock();
+               nonZeros = mb.getNonZeros();
+       }
+
+       /**
+        * 
+        * @return the column groups constructed by the compression process.
+        * 
+        */
+       public ArrayList<ColGroup> getColGroups() {
+               return _colGroups;
+       }
+
+       /**
+        * @return true if this block is in compressed form; false if the block 
has
+        *         not yet been compressed
+        */
+       public boolean isCompressed() {
+               return (_colGroups != null);
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public boolean isSingleUncompressedGroup(){
+               return (_colGroups!=null && _colGroups.size()==1 
+                               && _colGroups.get(0) instanceof 
ColGroupUncompressed);
+       }
+
+       private void allocateColGroupList() {
+               _colGroups = new ArrayList<ColGroup>();
+       }
+       
+       @Override
+       public boolean isEmptyBlock(boolean safe)  {
+               if( !isCompressed() )
+                       return super.isEmptyBlock(safe);                
+               return (_colGroups == null || getNonZeros()==0);
+       }
+       
+       /**
+        * Compress the contents of this matrix block. After compression, the
+        * uncompressed data is discarded. Attempts to update this block after
+        * calling this method currently result in INCORRECT BEHAVIOR, something
+        * which should be fixed if we move ahead with this compression 
strategy.
+        * 
+        * +per column sparsity
+        */
+       public void compress() 
+               throws DMLRuntimeException 
+       {
+               //check for redundant compression
+               if( isCompressed() ){
+                       throw new DMLRuntimeException("Redundant compression, 
block already compressed.");
+               }
+
+               Timing time = new Timing(true);
+               _stats = new CompressionStatistics();
+               
+               // SAMPLE-BASED DECISIONS:
+               // Decisions such as testing if a column is amenable to bitmap
+               // compression or evaluating co-coding potentionls are made 
based on a
+               // subset of the rows. For large datasets, sampling might take a
+               // significant amount of time. So, we generate only one sample 
and use
+               // it for the entire compression process.
+
+               //prepare basic meta data and deep copy / transpose input
+               final int numRows = getNumRows();
+               final int numCols = getNumColumns();
+               final boolean sparse = isInSparseFormat();
+               MatrixBlock rawblock = this;
+               if( TRANSPOSE_INPUT )
+                       rawblock = LibMatrixReorg.transpose(rawblock, new 
MatrixBlock(numCols, numRows, sparse));
+               else
+                       rawblock = new MatrixBlock(this);
+               
+               //construct sample-based size estimator
+               CompressedSizeEstimator bitmapSizeEstimator = 
+                               SizeEstimatorFactory.getSizeEstimator(rawblock, 
numRows);
+
+               //allocate list of column groups
+               allocateColGroupList();
+
+               // The current implementation of this method is written for 
correctness,
+               // not for performance or for minimal use of temporary space.
+
+               // We start with a full set of columns.
+               HashSet<Integer> remainingCols = new HashSet<Integer>();
+               for (int i = 0; i < numCols; i++) {
+                       remainingCols.add(i);
+               }
+
+               // PHASE 1: Classify columns by compression type
+               // We start by determining which columns are amenable to bitmap
+               // compression
+
+               // It is correct to use the dense size as the uncompressed size
+               // FIXME not numRows but nnz / col otherwise too aggressive 
overestimation
+               // of uncompressed size and hence overestimation of compression 
potential
+               double uncompressedColumnSize = 8 * numRows;
+
+               // information about the bitmap amenable columns
+               List<Integer> bitmapCols = new ArrayList<Integer>();
+               List<Integer> uncompressedCols = new ArrayList<Integer>();
+               List<Integer> colsCardinalities = new ArrayList<Integer>();
+               List<Long> compressedSizes = new ArrayList<Long>();
+               HashMap<Integer, Double> compressionRatios = new 
HashMap<Integer, Double>();
+               
+               // Minimum ratio (size of uncompressed / size of compressed) 
that we
+               // will accept when encoding a field with a bitmap.
+               for (int col = 0; col < numCols; col++) 
+               {
+                       CompressedSizeInfo compressedSizeInfo = 
bitmapSizeEstimator
+                                       .estimateCompressedColGroupSize(new 
int[] { col });
+                       long compressedSize = compressedSizeInfo.getMinSize();
+                       double compRatio = uncompressedColumnSize / 
compressedSize;
+                       
+                       //FIXME: compression ratio should be checked against 1 
instead of min compression
+                       //ratio; I think this threshold was only required 
because we overestimated the 
+                       //the uncompressed column size with n\alpha instead of 
z\alpha
+                       if (compRatio >= MIN_COMPRESSION_RATIO) {
+                               bitmapCols.add(col);
+                               compressionRatios.put(col, compRatio);
+                               
colsCardinalities.add(compressedSizeInfo.getEstCarinality());
+                               compressedSizes.add(compressedSize);
+                       }
+                       else
+                               uncompressedCols.add(col);
+               }
+
+               _stats.timePhase1 = time.stop();
+               if( LOG.isDebugEnabled() )
+                       LOG.debug("compression phase 1: "+_stats.timePhase1);
+               
+               // Filters for additional types of compression should be 
inserted here.
+
+               // PHASE 2: Grouping columns
+               // Divide the bitmap columns into column groups.
+               List<int[]> bitmapColGrps = null;
+               if (bitmapCols.size() > MAX_NUMBER_COCODING_COLUMNS) {
+                       // Too many columns to compute co-coding groups with 
current methods.
+                       // Generate singleton groups.
+                       bitmapColGrps = new ArrayList<int[]>(bitmapCols.size());
+                       for (int col : bitmapCols) {
+                               bitmapColGrps.add(new int[] { col });
+                       }
+               } 
+               else {
+                       bitmapColGrps = 
PlanningCoCoder.findCocodesByPartitioning(
+                                       bitmapSizeEstimator, bitmapCols, 
colsCardinalities,
+                                       compressedSizes, numRows, 
isInSparseFormat() ? 
+                                       OptimizerUtils.getSparsity(numRows, 
numCols, getNonZeros()): 1);
+               }
+
+               _stats.timePhase2 = time.stop();
+               if( LOG.isDebugEnabled() )
+                       LOG.debug("compression phase 2: "+_stats.timePhase2);
+               
+               if( INVESTIGATE_ESTIMATES ) {
+                       double est = 0;
+                       for( int[] groupIndices : bitmapColGrps )
+                               est += 
bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize();
+                       est += uncompressedCols.size() * uncompressedColumnSize;
+                       _stats.estSize = est;
+               }
+               
+               // PHASE 3: Compress and correct sample-based decisions
+               
+               for (int[] groupIndices : bitmapColGrps) 
+               {
+                       int[] allGroupIndices = null;
+                       int allColsCount = groupIndices.length;
+                       CompressedSizeInfo bitmapSizeInfo;
+                       // The compression type is decided based on a full 
bitmap since it
+                       // will be reused for the actual compression step.
+                       UncompressedBitmap ubm;
+                       PriorityQueue<CompressedColumn> compRatioPQ = null;
+                       boolean skipGroup = false;
+                       while (true) 
+                       {
+                               ubm = BitmapEncoder.extractBitmap(groupIndices, 
rawblock); 
+                               bitmapSizeInfo = bitmapSizeEstimator
+                                               
.estimateCompressedColGroupSize(ubm);
+                               double compRatio = uncompressedColumnSize * 
groupIndices.length
+                                               / bitmapSizeInfo.getMinSize();
+                               if (compRatio >= MIN_COMPRESSION_RATIO) {
+                                       // we have a good group
+                                       for( Integer col : groupIndices )
+                                               remainingCols.remove(col);
+                                       break;
+                               } else {
+                                       // modify the group
+                                       if (compRatioPQ == null) {
+                                               // first modification
+                                               allGroupIndices = 
Arrays.copyOf(groupIndices, groupIndices.length);
+                                               compRatioPQ = new 
PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
+                                               for (int i = 0; i < 
groupIndices.length; i++)
+                                                       compRatioPQ.add(new 
CompressedColumn(i,
+                                                                       
compressionRatios.get(groupIndices[i])));
+                                       }
+
+                                       // index in allGroupIndices
+                                       int removeIx = compRatioPQ.poll().colIx;
+                                       allGroupIndices[removeIx] = -1;
+                                       allColsCount--;
+                                       if (allColsCount == 0) {
+                                               skipGroup = true;
+                                               break;
+                                       }
+                                       groupIndices = new int[allColsCount];
+                                       // copying the values that do not equal 
-1
+                                       int ix = 0;
+                                       for (int col : allGroupIndices) {
+                                               if (col != -1) {
+                                                       groupIndices[ix++] = 
col;
+                                               }
+                                       }
+
+                               }
+                       }
+
+                       if (skipGroup)
+                               continue;
+                       long rleNumBytes = bitmapSizeInfo.getRLESize();
+                       long offsetNumBytes = bitmapSizeInfo.getOLESize();
+                       double rleRatio = (double) offsetNumBytes / (double) 
rleNumBytes;
+
+                       if (rleRatio > MIN_RLE_RATIO) {
+                               ColGroupRLE compressedGroup = new 
ColGroupRLE(groupIndices,
+                                               numRows, ubm);
+                               _colGroups.add(compressedGroup);
+                       } 
+                       else {
+                               ColGroupOLE compressedGroup = new ColGroupOLE(
+                                               groupIndices, numRows, ubm);
+                               _colGroups.add(compressedGroup);
+                       }
+
+               }
+               
+               _stats.timePhase3 = time.stop();
+               if( LOG.isDebugEnabled() )
+                       LOG.debug("compression phase 3: "+_stats.timePhase3);
+               
+               // Phase 4: Cleanup
+               // The remaining columns are stored uncompressed as one big 
column group
+               if (remainingCols.size() > 0) {
+                       ArrayList<Integer> list = new 
ArrayList<Integer>(remainingCols);
+                       ColGroupUncompressed ucgroup = new 
ColGroupUncompressed(list, rawblock);
+                       _colGroups.add(ucgroup);
+               }
+
+               //final cleanup (discard uncompressed block)
+               rawblock.cleanupBlock(true, true);
+               this.cleanupBlock(true, true);
+               
+               _stats.timePhase4 = time.stop();
+               if( LOG.isDebugEnabled() )
+                       LOG.debug("compression phase 4: "+_stats.timePhase4);
+       }
+
+       /**
+        * @return a new uncompressed matrix block containing the contents of 
this
+        *         block
+        */
+       public MatrixBlock decompress() throws DMLRuntimeException 
+       {
+               //early abort for not yet compressed blocks
+               if( !isCompressed() )
+                       return new MatrixBlock(this); 
+               
+               //preallocation sparse rows to avoid repeated reallocations     
        
+               MatrixBlock ret = new MatrixBlock(getNumRows(), 
getNumColumns(), isInSparseFormat(), getNonZeros());
+               if( ret.isInSparseFormat() ) {
+                       int[] rnnz = new int[rlen];
+                       for (ColGroup grp : _colGroups)
+                               grp.countNonZerosPerRow(rnnz);
+                       ret.allocateSparseRowsBlock();
+                       SparseBlock rows = ret.getSparseBlock();
+                       for( int i=0; i<rlen; i++ )
+                               rows.allocate(i, rnnz[i]);
+               }
+               
+               //core decompression (append if sparse)
+               for (ColGroup grp : _colGroups)
+                       grp.decompressToBlock(ret);
+               
+               //post-processing (for append in decompress)
+               if( isInSparseFormat() )
+                       ret.sortSparseRows();
+
+               return ret;
+       }
+
+       public CompressionStatistics getCompressionStatistics(){
+               return _stats;
+       }
+
+       /**
+        * 
+        * @return an upper bound on the memory used to store this compressed 
block
+        *         considering class overhead.
+        */
+       public long estimateCompressedSizeInMemory() {
+               if (!isCompressed())
+                       return 0;
+               // basic data inherited from MatrixBlock
+               long total = MatrixBlock.estimateSizeInMemory(0, 0, 0);
+               // adding the size of colGroups ArrayList overhead
+               // object overhead (32B) + int size (4B) + int modCount (4B) + 
Object[]
+               // elementData overhead + reference (32+8)B +reference ofr each 
Object (8B)
+               total += 80 + 8 * _colGroups.size();
+               for (ColGroup grp : _colGroups)
+                       total += grp.estimateInMemorySize();
+               return total;
+       }
+
+       private class CompressedColumn implements Comparable<CompressedColumn> {
+               int colIx;
+               double compRatio;
+
+               public CompressedColumn(int colIx, double compRatio) {
+                       this.colIx = colIx;
+                       this.compRatio = compRatio;
+               }
+
+               @Override
+               public int compareTo(CompressedColumn o) {
+                       return (int) Math.signum(compRatio - o.compRatio);
+               }
+       }
+       
+       public static class CompressionStatistics {
+               public double timePhase1 = -1;
+               public double timePhase2 = -1;
+               public double timePhase3 = -1;
+               public double timePhase4 = -1;
+               public double estSize = -1;
+               
+               public CompressionStatistics() {
+                       //do nothing
+               }
+               
+               public CompressionStatistics(double t1, double t2, double t3, 
double t4){
+                       timePhase1 = t1;
+                       timePhase2 = t2;
+                       timePhase3 = t3;
+                       timePhase4 = t4;
+               }
+       } 
+
+       
+       //////////////////////////////////////////
+       // Serialization / Deserialization
+
+       @Override
+       public long getExactSizeOnDisk() 
+       {
+               //header information
+               long ret = 12;
+               
+               for( ColGroup grp : _colGroups ) {
+                       ret += 1; //type info
+                       ret += grp.getExactSizeOnDisk();
+               }
+               
+               return ret;
+       }
+       
+       @Override
+       public void readFields(DataInput in) 
+               throws IOException 
+       {
+               boolean compressed = in.readBoolean();
+               
+               //deserialize uncompressed block
+               if( !compressed ) {
+                       super.readFields(in);
+                       return;
+               }
+               
+               //deserialize compressed block
+               rlen = in.readInt();
+               clen = in.readInt();
+               nonZeros = in.readLong();
+               int ncolGroups = in.readInt();
+               
+               _colGroups = new ArrayList<ColGroup>(ncolGroups);
+               for( int i=0; i<ncolGroups; i++ ) 
+               {
+                       CompressionType ctype = 
CompressionType.values()[in.readByte()];
+                       ColGroup grp = null;
+                       
+                       //create instance of column group
+                       switch( ctype ) {
+                               case UNCOMPRESSED:
+                                       grp = new ColGroupUncompressed(); break;
+                               case OLE_BITMAP:
+                                       grp = new ColGroupOLE(); break;
+                               case RLE_BITMAP:
+                                       grp = new ColGroupRLE(); break;
+                       }
+                       
+                       //deserialize and add column group
+                       grp.readFields(in);
+                       _colGroups.add(grp);
+               }
+       }
+       
+       @Override
+       public void write(DataOutput out) 
+               throws IOException 
+       {
+               out.writeBoolean( isCompressed() );
+               
+               //serialize uncompressed block
+               if( !isCompressed() ) {
+                       super.write(out);
+                       return;
+               }
+               
+               //serialize compressed matrix block
+               out.writeInt(rlen);
+               out.writeInt(clen);
+               out.writeLong(nonZeros);
+               out.writeInt(_colGroups.size());
+               
+               for( ColGroup grp : _colGroups ) {
+                       out.writeByte( grp.getCompType().ordinal() );
+                       grp.write(out); //delegate serialization
+               }
+       }
+       
+       
+       /**
+        * Redirects the default java serialization via externalizable to our 
default 
+        * hadoop writable serialization for efficient broadcast/rdd 
deserialization. 
+        * 
+        * @param is
+        * @throws IOException
+        */
+       @Override
+       public void readExternal(ObjectInput is) 
+               throws IOException
+       {
+               readFields(is);
+       }
+       
+       /**
+        * Redirects the default java serialization via externalizable to our 
default 
+        * hadoop writable serialization for efficient broadcast/rdd 
serialization. 
+        * 
+        * @param is
+        * @throws IOException
+        */
+       @Override
+       public void writeExternal(ObjectOutput os) 
+               throws IOException
+       {
+               write(os);      
+       }
+       
+       
+       //////////////////////////////////////////
+       // Operations (overwrite existing ops for seamless integration)
+
+       @Override
+       public MatrixValue scalarOperations(ScalarOperator sop, MatrixValue 
result) 
+               throws DMLRuntimeException
+       {
+               //call uncompressed matrix scalar if necessary
+               if( !isCompressed() ) {
+                       return super.scalarOperations(sop, result);
+               }
+               
+               //allocate the output matrix block
+               CompressedMatrixBlock ret = null;
+               if( result==null || !(result instanceof CompressedMatrixBlock) )
+                       ret = new CompressedMatrixBlock(getNumRows(), 
getNumColumns(), sparse);
+               else {
+                       ret = (CompressedMatrixBlock) result;
+                       ret.reset(rlen, clen);
+               }
+               
+               // Apply the operation recursively to each of the column groups.
+               // Most implementations will only modify metadata.
+               ArrayList<ColGroup> newColGroups = new ArrayList<ColGroup>();
+               for (ColGroup grp : _colGroups) {
+                       newColGroups.add(grp.scalarOperation(sop));
+               }
+               ret._colGroups = newColGroups;
+               ret.setNonZeros(rlen*clen);
+               
+               return ret;
+       }
+
+       @Override
+       public MatrixBlock appendOperations(MatrixBlock that, MatrixBlock ret) 
+               throws DMLRuntimeException
+       {
+               //call uncompressed matrix append if necessary
+               if( !isCompressed() ) {
+                       if( that instanceof CompressedMatrixBlock )
+                               that = ((CompressedMatrixBlock) 
that).decompress();
+                       return super.appendOperations(that, ret);
+               }
+               
+               final int m = rlen;
+               final int n = clen+that.getNumColumns();
+               final long nnz = nonZeros+that.getNonZeros();           
+               
+               //init result matrix 
+               CompressedMatrixBlock ret2 = null;
+               if( ret == null || !(ret instanceof CompressedMatrixBlock) ) {
+                       ret2 = new CompressedMatrixBlock(m, n, 
isInSparseFormat());
+               }
+               else {
+                       ret2 = (CompressedMatrixBlock) ret;
+                       ret2.reset(m, n);
+               }
+                       
+               //shallow copy of lhs column groups
+               ret2.allocateColGroupList();
+               ret2._colGroups.addAll(_colGroups);
+               
+               //copy of rhs column groups w/ col index shifting
+               if( !(that instanceof CompressedMatrixBlock) ) {
+                       that = new CompressedMatrixBlock(that);
+                       ((CompressedMatrixBlock)that).compress();
+               }
+               ArrayList<ColGroup> inColGroups = ((CompressedMatrixBlock) 
that)._colGroups;
+               for( ColGroup group : inColGroups ) {
+                       ColGroup tmp = ConverterUtils.copyColGroup(group);
+                       tmp.shiftColIndices(clen);
+                       ret2._colGroups.add(tmp);
+               }
+               
+               //meta data maintenance
+               ret2.setNonZeros(nnz);          
+               return ret2;
+       }
+       
+       @Override
+       public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock 
w, MatrixBlock out, ChainType ctype) 
+               throws DMLRuntimeException 
+       {
+               //call uncompressed matrix mult if necessary
+               if( !isCompressed() ) {
+                       return super.chainMatrixMultOperations(v, w, out, 
ctype);
+               }
+               
+               //single-threaded mmchain of single uncompressed colgroup
+               if( isSingleUncompressedGroup() ){
+                       return ((ColGroupUncompressed)_colGroups.get(0))
+                               .getData().chainMatrixMultOperations(v, w, out, 
ctype);
+               }
+               
+               //Timing time = new Timing(true);
+               
+               //prepare result
+               if( out != null )
+                       out.reset(clen, 1, false);
+               else 
+                       out = new MatrixBlock(clen, 1, false);
+               
+               //empty block handling
+               if( isEmptyBlock(false) ) 
+                       return out;
+                       
+               //compute matrix mult
+               MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
+               rightMultByVector(v, tmp);
+               if( ctype == ChainType.XtwXv ) {
+                       BinaryOperator bop = new 
BinaryOperator(Multiply.getMultiplyFnObject());
+                       LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
+               }
+               leftMultByVectorTranspose(_colGroups, tmp, out, true);
+               
+               //System.out.println("Compressed MMChain in "+time.stop());
+               
+               return out;
+       }
+
+       @Override
+       public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock 
w, MatrixBlock out, ChainType ctype, int k) 
+               throws DMLRuntimeException 
+       {
+               //call uncompressed matrix mult if necessary
+               if( !isCompressed() ){
+                       return super.chainMatrixMultOperations(v, w, out, 
ctype, k);
+               }
+               
+               //multi-threaded mmchain of single uncompressed colgroup
+               if( isSingleUncompressedGroup() ){
+                       return ((ColGroupUncompressed)_colGroups.get(0))
+                               .getData().chainMatrixMultOperations(v, w, out, 
ctype, k);
+               }
+
+               Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+               
+               //prepare result
+               if( out != null )
+                       out.reset(clen, 1, false);
+               else 
+                       out = new MatrixBlock(clen, 1, false);
+               
+               //empty block handling
+               if( isEmptyBlock(false) ) 
+                       return out;
+               
+               //compute matrix mult
+               MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
+               rightMultByVector(v, tmp, k);
+               if( ctype == ChainType.XtwXv ) {
+                       BinaryOperator bop = new 
BinaryOperator(Multiply.getMultiplyFnObject());
+                       LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
+               }
+               leftMultByVectorTranspose(_colGroups, tmp, out, true, k);
+               
+               if( LOG.isDebugEnabled() )
+                       LOG.debug("Compressed MMChain k="+k+" in "+time.stop());
+               
+               return out;
+       }
+       
+       @Override
+       public MatrixValue aggregateBinaryOperations(MatrixValue mv1, 
MatrixValue mv2, MatrixValue result, AggregateBinaryOperator op)
+                       throws DMLRuntimeException 
+       {
+               //call uncompressed matrix mult if necessary
+               if( !isCompressed() ) {
+                       return super.aggregateBinaryOperations(mv1, mv2, 
result, op);
+               }
+       
+               //multi-threaded mm of single uncompressed colgroup
+               if( isSingleUncompressedGroup() ){
+                       MatrixBlock tmp = 
((ColGroupUncompressed)_colGroups.get(0)).getData();
+                       return tmp.aggregateBinaryOperations(this==mv1?tmp:mv1, 
this==mv2?tmp:mv2, result, op);
+               }
+               
+               Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+               
+               //setup meta data (dimensions, sparsity)
+               int rl = mv1.getNumRows();
+               int cl = mv2.getNumColumns();
+               
+               //create output matrix block
+               MatrixBlock ret = (MatrixBlock) result;
+               if( ret==null )
+                       ret = new MatrixBlock(rl, cl, false, rl*cl);
+               else
+                       ret.reset(rl, cl, false, rl*cl);
+               
+               //compute matrix mult
+               if( mv1.getNumRows()>1 && mv2.getNumColumns()==1 ) { //MV right
+                       CompressedMatrixBlock cmb = (CompressedMatrixBlock)mv1;
+                       MatrixBlock mb = (MatrixBlock) mv2;
+                       if( op.getNumThreads()>1 )
+                               cmb.rightMultByVector(mb, ret, 
op.getNumThreads());
+                       else
+                               cmb.rightMultByVector(mb, ret);
+               }
+               else if( mv1.getNumRows()==1 && mv2.getNumColumns()>1 ) { //MV 
left
+                       MatrixBlock mb = (MatrixBlock) mv1;
+                       if( op.getNumThreads()>1 )
+                               leftMultByVectorTranspose(_colGroups, mb, ret, 
false, op.getNumThreads());
+                       else
+                               leftMultByVectorTranspose(_colGroups, mb, ret, 
false);
+               }
+               else {
+                       //NOTE: we could decompress and invoke 
super.aggregateBinary but for now
+                       //we want to have an eager fail if this happens
+                       throw new DMLRuntimeException("Unsupported 
matrix-matrix multiplication over compressed matrix block.");
+               }
+               
+               if( LOG.isDebugEnabled() )
+                       LOG.debug("Compressed MM in "+time.stop());
+               
+               return ret;
+       }
+       
+       @Override
+       public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, 
MatrixValue result, 
+                       int blockingFactorRow, int blockingFactorCol, 
MatrixIndexes indexesIn, boolean inCP) 
+               throws DMLRuntimeException
+       {
+               //call uncompressed matrix mult if necessary
+               if( !isCompressed() ) {
+                       return super.aggregateUnaryOperations(op, result, 
blockingFactorRow, blockingFactorCol, indexesIn, inCP);
+               }
+               
+               //check for supported operations
+               if( !(op.aggOp.increOp.fn instanceof KahanPlus || 
op.aggOp.increOp.fn instanceof KahanPlusSq) ){
+                       throw new DMLRuntimeException("Unary aggregates other 
than sums not supported yet.");
+               }
+               
+               //prepare output dimensions
+               CellIndex tempCellIndex = new CellIndex(-1,-1);
+               op.indexFn.computeDimension(rlen, clen, tempCellIndex);
+               if(op.aggOp.correctionExists) {
+                       switch(op.aggOp.correctionLocation)
+                       {
+                               case LASTROW: tempCellIndex.row++;  break;
+                               case LASTCOLUMN: tempCellIndex.column++; break;
+                               case LASTTWOROWS: tempCellIndex.row+=2; break;
+                               case LASTTWOCOLUMNS: tempCellIndex.column+=2; 
break;
+                               default:
+                                       throw new 
DMLRuntimeException("unrecognized correctionLocation: 
"+op.aggOp.correctionLocation); 
+                       }
+               }
+               
+               //prepare output
+               if(result==null)
+                       result=new MatrixBlock(tempCellIndex.row, 
tempCellIndex.column, false);
+               else
+                       result.reset(tempCellIndex.row, tempCellIndex.column, 
false);
+               
+               MatrixBlock ret = (MatrixBlock) result;
+               
+               //core unary aggregate
+               if(    op.getNumThreads() > 1 
+                       && getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD ) 
+               {
+                       // initialize and allocate the result
+                       ret.allocateDenseBlock();
+                       
+                       //multi-threaded execution of all groups 
+                       ArrayList<ColGroup>[] grpParts = 
createStaticTaskPartitioning(op.getNumThreads(), false);
+                       ColGroupUncompressed uc = getUncompressedColGroup();
+                       try {
+                               //compute all compressed column groups
+                               ExecutorService pool = 
Executors.newFixedThreadPool( op.getNumThreads() );
+                               ArrayList<UnaryAggregateTask> tasks = new 
ArrayList<UnaryAggregateTask>();
+                               for( ArrayList<ColGroup> grp : grpParts )
+                                       tasks.add(new UnaryAggregateTask(grp, 
ret, op));
+                               pool.invokeAll(tasks);  
+                               pool.shutdown();
+                               //compute uncompressed column group in parallel 
(otherwise bottleneck)
+                               if( uc != null )
+                                        ret = 
(MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, 
blockingFactorCol, indexesIn, false);                                     
+                               //aggregate partial results
+                               if( !(op.indexFn instanceof ReduceRow) ){
+                                       KahanObject kbuff = new 
KahanObject(0,0);
+                                       KahanPlus kplus = 
KahanPlus.getKahanPlusFnObject();
+                                       for( int i=0; i<ret.getNumRows(); i++ ) 
{
+                                               kbuff.set(ret.quickGetValue(i, 
0), ret.quickGetValue(i, 0));
+                                               for( UnaryAggregateTask task : 
tasks )
+                                                       kplus.execute2(kbuff, 
task.getResult().quickGetValue(i, 0));
+                                               ret.quickSetValue(i, 0, 
kbuff._sum);
+                                               ret.quickSetValue(i, 1, 
kbuff._correction);
+                                       }
+                               }               
+                       }
+                       catch(Exception ex) {
+                               throw new DMLRuntimeException(ex);
+                       }
+               }
+               else {
+                       for (ColGroup grp : _colGroups) {
+                               grp.unaryAggregateOperations(op, ret);
+                       }
+               }
+               
+               //drop correction if necessary
+               if(op.aggOp.correctionExists && inCP)
+                       ret.dropLastRowsOrColums(op.aggOp.correctionLocation);
+       
+               //post-processing
+               ret.recomputeNonZeros();
+               
+               return ret;
+       }
+       
+       @Override
+       public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, 
MMTSJType tstype) 
+               throws DMLRuntimeException 
+       {
+               //call uncompressed matrix mult if necessary
+               if( !isCompressed() ) {
+                       return super.transposeSelfMatrixMultOperations(out, 
tstype);
+               }
+                               
+               //single-threaded tsmm of single uncompressed colgroup
+               if( isSingleUncompressedGroup() ){
+                       return ((ColGroupUncompressed)_colGroups.get(0))
+                               
.getData().transposeSelfMatrixMultOperations(out, tstype);
+               }
+
+               Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+               //check for transpose type
+               if( tstype != MMTSJType.LEFT ) //right not supported yet
+                       throw new DMLRuntimeException("Invalid MMTSJ type 
'"+tstype.toString()+"'.");
+               
+               //create output matrix block
+               if( out == null )
+                       out = new MatrixBlock(clen, clen, false);
+               else
+                       out.reset(clen, clen, false);
+               out.allocateDenseBlock();
+               
+               if( !isEmptyBlock(false) ) {
+                       //compute matrix mult
+                       leftMultByTransposeSelf(_colGroups, out, 0, 
_colGroups.size());
+                       
+                       // post-processing
+                       out.recomputeNonZeros();
+               }
+               
+               if( LOG.isDebugEnabled() )
+                       LOG.debug("Compressed TSMM in "+time.stop());
+               
+               return out;
+       }
+
+       
+       @Override
+       public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, 
MMTSJType tstype, int k) 
+               throws DMLRuntimeException 
+       {
+               //call uncompressed matrix mult if necessary
+               if( !isCompressed() ){
+                       return super.transposeSelfMatrixMultOperations(out, 
tstype, k);
+               }
+               
+               //multi-threaded tsmm of single uncompressed colgroup
+               if( isSingleUncompressedGroup() ){
+                       return ((ColGroupUncompressed)_colGroups.get(0))
+                               
.getData().transposeSelfMatrixMultOperations(out, tstype, k);
+               }
+               
+               Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+               
+               //check for transpose type
+               if( tstype != MMTSJType.LEFT ) //right not supported yet
+                       throw new DMLRuntimeException("Invalid MMTSJ type 
'"+tstype.toString()+"'.");
+               
+               //create output matrix block
+               if( out == null )
+                       out = new MatrixBlock(clen, clen, false);
+               else
+                       out.reset(clen, clen, false);
+               out.allocateDenseBlock();
+               
+               if( !isEmptyBlock(false) ) {
+                       //compute matrix mult
+                       try {
+                               ExecutorService pool = 
Executors.newFixedThreadPool( k );
+                               ArrayList<MatrixMultTransposeTask> tasks = new 
ArrayList<MatrixMultTransposeTask>();
+                               int blklen = 
(int)(Math.ceil((double)clen/(2*k)));
+                               for( int i=0; i<2*k & i*blklen<clen; i++ )
+                                       tasks.add(new 
MatrixMultTransposeTask(_colGroups, out, i*blklen, Math.min((i+1)*blklen, 
clen)));
+                               List<Future<Object>> ret = 
pool.invokeAll(tasks);
+                               for( Future<Object> tret : ret )
+                                       tret.get(); //check for errors
+                               pool.shutdown();
+                       }
+                       catch(Exception ex) {
+                               throw new DMLRuntimeException(ex);
+                       }
+                       
+                       // post-processing
+                       out.recomputeNonZeros();
+               }
+               
+               if( LOG.isDebugEnabled() )
+                       LOG.debug("Compressed TSMM k="+k+" in "+time.stop());
+               
+               return out;
+       }
+
+       
+       /**
+        * Multiply this matrix block by a column vector on the right.
+        * 
+        * @param vector
+        *            right-hand operand of the multiplication
+        * @param result
+        *            buffer to hold the result; must have the appropriate size
+        *            already
+        */
+       private void rightMultByVector(MatrixBlock vector, MatrixBlock result)
+               throws DMLRuntimeException 
+       {
+               // initialize and allocate the result
+               result.allocateDenseBlock();
+
+               // delegate matrix-vector operation to each column group
+               for( ColGroup grp : _colGroups )
+                       if( grp instanceof ColGroupUncompressed ) //overwrites 
output
+                               grp.rightMultByVector(vector, result, 0, 
result.getNumRows());
+               for( ColGroup grp : _colGroups )
+                       if( !(grp instanceof ColGroupUncompressed) ) //adds to 
output
+                               grp.rightMultByVector(vector, result, 0, 
result.getNumRows());
+               
+               // post-processing
+               result.recomputeNonZeros();
+       }
+
+       /**
+        * Multi-threaded version of rightMultByVector.
+        * 
+        * @param vector
+        * @param result
+        * @param k
+        * @throws DMLRuntimeException
+        */
+       private void rightMultByVector(MatrixBlock vector, MatrixBlock result, 
int k)
+               throws DMLRuntimeException 
+       {
+               // initialize and allocate the result
+               result.allocateDenseBlock();
+
+               //multi-threaded execution of all groups
+               try {
+                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       int rlen = getNumRows();
+                       int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
+                       int blklen = (int)(Math.ceil((double)rlen/k));
+                       blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+                       ArrayList<RightMatrixMultTask> tasks = new 
ArrayList<RightMatrixMultTask>();
+                       for( int i=0; i<k & i*blklen<getNumRows(); i++ )
+                               tasks.add(new RightMatrixMultTask(_colGroups, 
vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));
+                       pool.invokeAll(tasks);  
+                       pool.shutdown();
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+               
+               // post-processing
+               result.recomputeNonZeros();
+       }
+       
+       /**
+        * Multiply this matrix block by the transpose of a column vector (i.e.
+        * t(v)%*%X)
+        * 
+        * @param vector
+        *            left-hand operand of the multiplication
+        * @param result
+        *            buffer to hold the result; must have the appropriate size
+        *            already
+        */
+       private static void leftMultByVectorTranspose(List<ColGroup> colGroups, 
MatrixBlock vector, MatrixBlock result, boolean doTranspose) 
+               throws DMLRuntimeException 
+       {
+               //transpose vector if required
+               MatrixBlock rowVector = vector;
+               if (doTranspose) {
+                       rowVector = new MatrixBlock(1, vector.getNumRows(), 
false);
+                       LibMatrixReorg.transpose(vector, rowVector);
+               }
+               
+               // initialize and allocate the result
+               result.reset();
+               result.allocateDenseBlock();
+               
+               // delegate matrix-vector operation to each column group
+               for (ColGroup grp : colGroups) {                        
+                       grp.leftMultByRowVector(rowVector, result);
+               }
+
+               // post-processing
+               result.recomputeNonZeros();
+       }
+       
+       /**
+        * Multi-thread version of leftMultByVectorTranspose.
+        * 
+        * @param vector
+        * @param result
+        * @param doTranspose
+        * @param k
+        * @throws DMLRuntimeException
+        */
+       private static void leftMultByVectorTranspose(List<ColGroup> 
colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) 
+               throws DMLRuntimeException 
+       {
+               int kuc = Math.max(1, k - colGroups.size() + 1);
+               
+               //transpose vector if required
+               MatrixBlock rowVector = vector;
+               if (doTranspose) {
+                       rowVector = new MatrixBlock(1, vector.getNumRows(), 
false);
+                       LibMatrixReorg.transpose(vector, rowVector);
+               }
+               
+               // initialize and allocate the result
+               result.reset();
+               result.allocateDenseBlock();
+
+               //multi-threaded execution
+               try {
+                       ExecutorService pool = Executors.newFixedThreadPool( 
Math.min(colGroups.size(), k) );
+                       ArrayList<LeftMatrixMultTask> tasks = new 
ArrayList<LeftMatrixMultTask>();
+                       for( ColGroup grp : colGroups )
+                               tasks.add(new LeftMatrixMultTask(grp, 
rowVector, result, kuc));
+                       pool.invokeAll(tasks);  
+                       pool.shutdown();
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+
+               // post-processing
+               result.recomputeNonZeros();
+       }
+
+       /**
+        * 
+        * @param result
+        * @throws DMLRuntimeException
+        */
+       private static void leftMultByTransposeSelf(ArrayList<ColGroup> groups, 
MatrixBlock result, int cl, int cu)
+               throws DMLRuntimeException 
+       {
+               final int numRows = groups.get(0).getNumRows();
+               final int numGroups = groups.size();            
+               
+               //preallocated dense matrix block
+               MatrixBlock lhs = new MatrixBlock(numRows, 1, false);
+               lhs.allocateDenseBlock();
+               
+               //approach: for each colgroup, extract uncompressed columns one 
at-a-time
+               //vector-matrix multiplies against remaining col groups
+               for( int i=cl; i<cu; i++ ) 
+               {
+                       //get current group and relevant col groups
+                       ColGroup group = groups.get(i); 
+                       int[] ixgroup = group.getColIndices();
+                       List<ColGroup> tmpList = groups.subList(i, numGroups);
+                       
+                       //for all uncompressed lhs columns vectors
+                       for( int j=0; j<ixgroup.length; j++ ) {
+                               //decompress single column
+                               lhs.reset(numRows, 1, false);                   
        
+                               group.decompressToBlock(lhs, j);
+                               
+                               if( !lhs.isEmptyBlock(false) ) {
+                                       //compute vector-matrix partial result
+                                       MatrixBlock tmpret = new 
MatrixBlock(1,result.getNumColumns(),false);
+                                       leftMultByVectorTranspose(tmpList, lhs, 
tmpret, true);                                                          
+                                       
+                                       //write partial results (disjoint 
non-zeros)
+                                       
LinearAlgebraUtils.copyNonZerosToRowCol(result, tmpret, ixgroup[j]);    
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * 
+        * @param k
+        * @return
+        */
+       @SuppressWarnings("unchecked")
+       private ArrayList<ColGroup>[] createStaticTaskPartitioning(int k, 
boolean inclUncompressed)
+       {
+               // special case: single uncompressed col group
+               if( _colGroups.size()==1 && _colGroups.get(0) instanceof 
ColGroupUncompressed ){
+                       return new ArrayList[0];
+               }
+               
+               // initialize round robin col group distribution
+               // (static task partitioning to reduce mem requirements/final 
agg)
+               int numTasks = Math.min(k, _colGroups.size());
+               ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks];
+               int pos = 0;
+               for( ColGroup grp : _colGroups ){
+                       if( grpParts[pos]==null )
+                               grpParts[pos] = new ArrayList<ColGroup>();
+                       if( inclUncompressed || !(grp instanceof 
ColGroupUncompressed) ) {
+                               grpParts[pos].add(grp);
+                               pos = (pos==numTasks-1) ? 0 : pos+1;
+                       }
+               }
+               
+               return grpParts;
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       private ColGroupUncompressed getUncompressedColGroup()
+       {
+               for( ColGroup grp : _colGroups )
+                       if( grp instanceof ColGroupUncompressed ) 
+                               return (ColGroupUncompressed)grp;
+               
+               return null;
+       }
+       
+       /**
+        * 
+        */
+       private static class LeftMatrixMultTask implements Callable<Object> 
+       {
+               private ColGroup _group = null;
+               private MatrixBlock _vect = null;
+               private MatrixBlock _ret = null;
+               private int _kuc = 1;
+               
+               protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, 
MatrixBlock ret, int kuc)  {
+                       _group = group;
+                       _vect = vect;
+                       _ret = ret;
+                       _kuc = kuc;
+               }
+               
+               @Override
+               public Object call() throws DMLRuntimeException 
+               {
+                       // delegate matrix-vector operation to each column group
+                       if( _group instanceof ColGroupUncompressed && _kuc >1 
&& ColGroupBitmap.LOW_LEVEL_OPT )
+                               
((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc);
+                       else
+                               _group.leftMultByRowVector(_vect, _ret);
+                       return null;
+               }
+       }
+       
+       /**
+        * 
+        */
+       private static class RightMatrixMultTask implements Callable<Object> 
+       {
+               private ArrayList<ColGroup> _groups = null;
+               private MatrixBlock _vect = null;
+               private MatrixBlock _ret = null;
+               private int _rl = -1;
+               private int _ru = -1;
+               
+               protected RightMatrixMultTask( ArrayList<ColGroup> groups, 
MatrixBlock vect, MatrixBlock ret, int rl, int ru)  {
+                       _groups = groups;
+                       _vect = vect;
+                       _ret = ret;
+                       _rl = rl;
+                       _ru = ru;
+               }
+               
+               @Override
+               public Object call() throws DMLRuntimeException 
+               {
+                       // delegate vector-matrix operation to each column group
+                       for( ColGroup grp : _groups )
+                               if( grp instanceof ColGroupUncompressed ) 
//overwrites output
+                                       grp.rightMultByVector(_vect, _ret, _rl, 
_ru);
+                       for( ColGroup grp : _groups )
+                               if( !(grp instanceof ColGroupUncompressed) ) 
//adds to output
+                                       grp.rightMultByVector(_vect, _ret, _rl, 
_ru);
+                       return null;
+               }
+       }
+       
+       private static class MatrixMultTransposeTask implements 
Callable<Object> 
+       {
+               private ArrayList<ColGroup> _groups = null;
+               private MatrixBlock _ret = null;
+               private int _cl = -1;
+               private int _cu = -1;
+               
+               protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, 
MatrixBlock ret, int cl, int cu)  {
+                       _groups = groups;
+                       _ret = ret;
+                       _cl = cl;
+                       _cu = cu;
+               }
+               
+               @Override
+               public Object call() throws DMLRuntimeException {
+                       leftMultByTransposeSelf(_groups, _ret, _cl, _cu);
+                       return null;
+               }
+       }
+       
+       private static class UnaryAggregateTask implements Callable<Object> 
+       {
+               private ArrayList<ColGroup> _groups = null;
+               private MatrixBlock _ret = null;
+               private AggregateUnaryOperator _op = null;
+               
+               protected UnaryAggregateTask( ArrayList<ColGroup> groups, 
MatrixBlock ret, AggregateUnaryOperator op)  {
+                       _groups = groups;
+                       _op = op;
+                       
+                       if( !(_op.indexFn instanceof ReduceRow) ) { 
//sum/rowSums
+                               _ret = new MatrixBlock(ret.getNumRows(), 
ret.getNumColumns(), false);
+                               _ret.allocateDenseBlock();
+                       }
+                       else { //colSums
+                               _ret = ret;
+                       }
+               }
+               
+               @Override
+               public Object call() throws DMLRuntimeException 
+               {
+                       // delegate vector-matrix operation to each column group
+                       for( ColGroup grp : _groups )
+                               grp.unaryAggregateOperations(_op, _ret);
+                       return null;
+               }
+               
+               public MatrixBlock getResult(){
+                       return _ret;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
new file mode 100644
index 0000000..a76223c
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+
+/**
+ * Used for the finding columns to co-code
+ * 
+ */
+public class PlanningBinPacker 
+{
+       private final float _binWeight;
+       private final List<Integer> _items;
+       private final List<Float> _itemWeights;
+
+       public PlanningBinPacker(float binWeight, List<Integer> items, 
List<Float> itemWeights) {
+               _binWeight = binWeight;
+               _items = items;
+               _itemWeights = itemWeights;
+       }
+
+       /**
+        * NOTE: upper bound is 17/10 OPT
+        * 
+        * @return key: available space, value: list of the bins that have that 
free space
+        */
+       public TreeMap<Float, List<List<Integer>>> packFirstFit() {
+               return packFirstFit(_items, _itemWeights);
+       }
+
+       /**
+        * shuffling the items to make some potential for having bins of 
different
+        * sizes when consecutive columns are of close cardinalities
+        * 
+        * @return key: available space, value: list of the bins that have that 
free
+        *         space
+        */
+       public TreeMap<Float, List<List<Integer>>> packFirstFitShuffled() {
+               RandomDataGenerator rnd = new RandomDataGenerator();
+               int[] permutation = rnd.nextPermutation(_items.size(), 
_items.size());
+               List<Integer> shuffledItems = new 
ArrayList<Integer>(_items.size());
+               List<Float> shuffledWeights = new 
ArrayList<Float>(_items.size());
+               for (int ix : permutation) {
+                       shuffledItems.add(_items.get(ix));
+                       shuffledWeights.add(_itemWeights.get(ix));
+               }
+
+               return packFirstFit(shuffledItems, shuffledWeights);
+       }
+
+       /**
+        * 
+        * @param items
+        * @param itemWeights
+        * @return
+        */
+       private TreeMap<Float, List<List<Integer>>> packFirstFit(List<Integer> 
items, List<Float> itemWeights) 
+       {
+               // when searching for a bin, the first bin in the list is used
+               TreeMap<Float, List<List<Integer>>> bins = new TreeMap<Float, 
List<List<Integer>>>();
+               // first bin
+               bins.put(_binWeight, createBinList());
+               int numItems = items.size();
+               for (int i = 0; i < numItems; i++) {
+                       float itemWeight = itemWeights.get(i);
+                       Map.Entry<Float, List<List<Integer>>> entry = bins
+                                       .ceilingEntry(itemWeight);
+                       if (entry == null) {
+                               // new bin
+                               float newBinWeight = _binWeight - itemWeight;
+                               List<List<Integer>> binList = 
bins.get(newBinWeight);
+                               if (binList == null) {
+                                       bins.put(newBinWeight, 
createBinList(items.get(i)));
+                               } else {
+                                       List<Integer> newBin = new 
ArrayList<Integer>();
+                                       newBin.add(items.get(i));
+                                       binList.add(newBin);
+                               }
+                       } else {
+                               // add to the first bin in the list
+                               List<Integer> assignedBin = 
entry.getValue().remove(0);
+                               assignedBin.add(items.get(i));
+                               if (entry.getValue().size() == 0)
+                                       bins.remove(entry.getKey());
+                               float newBinWeight = entry.getKey() - 
itemWeight;
+                               List<List<Integer>> newBinsList = 
bins.get(newBinWeight);
+                               if (newBinsList == null) {
+                                       // new bin
+                                       bins.put(newBinWeight, 
createBinList(assignedBin));
+                               } else {
+                                       newBinsList.add(assignedBin);
+                               }
+                       }
+               }
+               return bins;
+       }
+
+       /**
+        * NOTE: upper bound is 11/9 OPT + 6/9 (~1.22 OPT)
+        * 
+        * @return
+        */
+       public TreeMap<Float, List<List<Integer>>> packFirstFitDescending() {
+               // sort items descending based on their weights
+               Integer[] indexes = new Integer[_items.size()];
+               for (int i = 0; i < indexes.length; i++)
+                       indexes[i] = i;
+               Arrays.sort(indexes, new Comparator<Integer>() {
+
+                       @Override
+                       public int compare(Integer o1, Integer o2) {
+                               return 
_itemWeights.get(o1).compareTo(_itemWeights.get(o2));
+                       }
+               });
+               List<Integer> sortedItems = new ArrayList<Integer>();
+               List<Float> sortedItemWeights = new ArrayList<Float>();
+               for (int i = indexes.length - 1; i >= 0; i--) {
+                       sortedItems.add(_items.get(i));
+                       sortedItemWeights.add(_itemWeights.get(i));
+               }
+               return packFirstFit(sortedItems, sortedItemWeights);
+       }
+
+       /**
+        * NOTE: upper bound is 71/60 OPT + 6/9 (~1.18 OPT)
+        * 
+        * @return
+        */
+       public TreeMap<Float, List<List<Integer>>> 
packModifiedFirstFitDescending() {
+               throw new UnsupportedOperationException("Not implemented yet!");
+       }
+
+       /**
+        * 
+        * @return
+        */
+       private List<List<Integer>> createBinList() {
+               List<List<Integer>> binList = new ArrayList<List<Integer>>();
+               binList.add(new ArrayList<Integer>());
+               return binList;
+       }
+
+       /**
+        * 
+        * @param item
+        * @return
+        */
+       private List<List<Integer>> createBinList(int item) {
+               List<List<Integer>> binList = new ArrayList<List<Integer>>();
+               List<Integer> bin = new ArrayList<Integer>();
+               binList.add(bin);
+               bin.add(item);
+               return binList;
+       }
+
+       /**
+        * 
+        * @param bin
+        * @return
+        */
+       private List<List<Integer>> createBinList(List<Integer> bin) {
+               List<List<Integer>> binList = new ArrayList<List<Integer>>();
+               binList.add(bin);
+               return binList;
+       }
+}

Reply via email to