http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
index efdcc86..6d2ec43 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -22,31 +22,29 @@ package org.apache.sysml.runtime.compress;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.utils.ConverterUtils;
 import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.KahanFunction;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
-import org.apache.sysml.runtime.functionobjects.ReduceAll;
-import org.apache.sysml.runtime.functionobjects.ReduceCol;
-import org.apache.sysml.runtime.functionobjects.ReduceRow;
-import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 
 
 /** A group of columns compressed with a single run-length encoded bitmap. */
-public class ColGroupRLE extends ColGroupBitmap 
+public class ColGroupRLE extends ColGroupOffset 
 {
        private static final long serialVersionUID = 7450232907594748177L;
 
+       private static final Log LOG = 
LogFactory.getLog(ColGroupRLE.class.getName());
+       
        public ColGroupRLE() {
-               super(CompressionType.RLE_BITMAP);
+               super();
        }
        
        /**
@@ -62,26 +60,37 @@ public class ColGroupRLE extends ColGroupBitmap
         */
        public ColGroupRLE(int[] colIndices, int numRows, UncompressedBitmap 
ubm) 
        {
-               super(CompressionType.RLE_BITMAP, colIndices, numRows, ubm);
+               super(colIndices, numRows, ubm);
                
                // compress the bitmaps
                final int numVals = ubm.getNumValues();
                char[][] lbitmaps = new char[numVals][];
                int totalLen = 0;
                for( int k=0; k<numVals; k++ ) {
-                       lbitmaps[k] = 
BitmapEncoder.genRLEBitmap(ubm.getOffsetsList(k));
+                       lbitmaps[k] = BitmapEncoder.genRLEBitmap(
+                               ubm.getOffsetsList(k).extractValues(), 
ubm.getNumOffsets(k));
                        totalLen += lbitmaps[k].length;
                }
                
                // compact bitmaps to linearized representation
                createCompressedBitmaps(numVals, totalLen, lbitmaps);
+               
+               //debug output
+               double ucSize = MatrixBlock.estimateSizeDenseInMemory(numRows, 
colIndices.length);
+               if( estimateInMemorySize() > ucSize )
+                       LOG.warn("RLE group larger than UC dense: 
"+estimateInMemorySize()+" "+ucSize);
        }
 
        public ColGroupRLE(int[] colIndices, int numRows, boolean zeros, 
double[] values, char[] bitmaps, int[] bitmapOffs) {
-               super(CompressionType.RLE_BITMAP, colIndices, numRows, zeros, 
values);
+               super(colIndices, numRows, zeros, values);
                _data = bitmaps;
                _ptr = bitmapOffs;
        }
+       
+       @Override
+       public CompressionType getCompType() {
+               return CompressionType.RLE_BITMAP;
+       }
 
        @Override
        public Iterator<Integer> getDecodeIterator(int k) {
@@ -247,7 +256,7 @@ public class ColGroupRLE extends ColGroupBitmap
                        //L3 cache alignment, see comment rightMultByVector OLE 
column group 
                        //core difference of RLE to OLE is that runs are not 
segment alignment,
                        //which requires care of handling runs crossing 
cache-buckets
-                       final int blksz = ColGroupBitmap.WRITE_CACHE_BLKSZ; 
+                       final int blksz = ColGroupOffset.WRITE_CACHE_BLKSZ; 
                        
                        //step 1: prepare position and value arrays
                        
@@ -335,7 +344,7 @@ public class ColGroupRLE extends ColGroupBitmap
                if( LOW_LEVEL_OPT && numVals > 1 
                        && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) 
                {
-                       final int blksz = ColGroupBitmap.READ_CACHE_BLKSZ; 
+                       final int blksz = ColGroupOffset.READ_CACHE_BLKSZ; 
                        
                        //step 1: prepare position and value arrays
                        
@@ -423,7 +432,7 @@ public class ColGroupRLE extends ColGroupBitmap
                }
                
                double[] rvalues = applyScalarOp(op, val0, getNumCols());       
        
-               char[] lbitmap = BitmapEncoder.genRLEBitmap(loff);
+               char[] lbitmap = BitmapEncoder.genRLEBitmap(loff, loff.length);
                char[] rbitmaps = Arrays.copyOf(_data, 
_data.length+lbitmap.length);
                System.arraycopy(lbitmap, 0, rbitmaps, _data.length, 
lbitmap.length);
                int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1);
@@ -432,49 +441,9 @@ public class ColGroupRLE extends ColGroupBitmap
                return new ColGroupRLE(_colIndexes, _numRows, 
loff.length<_numRows,
                                rvalues, rbitmaps, rbitmapOffs);
        }
-       
-       @Override
-       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock result) 
-               throws DMLRuntimeException
-       {
-               unaryAggregateOperations(op, result, 0, getNumRows());
-       }
-       
-       
-       @Override
-       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock result, int rl, int ru) 
-               throws DMLRuntimeException 
-       {
-               //sum and sumsq (reduceall/reducerow over tuples and counts)
-               if( op.aggOp.increOp.fn instanceof KahanPlus || 
op.aggOp.increOp.fn instanceof KahanPlusSq ) 
-               {
-                       KahanFunction kplus = (op.aggOp.increOp.fn instanceof 
KahanPlus) ?
-                                       KahanPlus.getKahanPlusFnObject() : 
KahanPlusSq.getKahanPlusSqFnObject();
-                       
-                       if( op.indexFn instanceof ReduceAll )
-                               computeSum(result, kplus);
-                       else if( op.indexFn instanceof ReduceCol )
-                               computeRowSums(result, kplus, rl, ru);
-                       else if( op.indexFn instanceof ReduceRow )
-                               computeColSums(result, kplus);
-               }
-               //min and max (reduceall/reducerow over tuples only)
-               else if(op.aggOp.increOp.fn instanceof Builtin 
-                               && 
(((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX 
-                               || 
((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) 
-               {               
-                       Builtin builtin = (Builtin) op.aggOp.increOp.fn;
 
-                       if( op.indexFn instanceof ReduceAll )
-                               computeMxx(result, builtin);
-                       else if( op.indexFn instanceof ReduceCol )
-                               computeRowMxx(result, builtin, rl, ru);
-                       else if( op.indexFn instanceof ReduceRow )
-                               computeColMxx(result, builtin);
-               }
-       }
-
-       private void computeSum(MatrixBlock result, KahanFunction kplus)
+       @Override
+       protected final void computeSum(MatrixBlock result, KahanFunction kplus)
        {
                KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), 
result.quickGetValue(0, 1));
                
@@ -502,37 +471,93 @@ public class ColGroupRLE extends ColGroupBitmap
                result.quickSetValue(0, 1, kbuff._correction);
        }
 
-       private void computeRowSums(MatrixBlock result, KahanFunction kplus, 
int rl, int ru)
+       @Override
+       protected final void computeRowSums(MatrixBlock result, KahanFunction 
kplus, int rl, int ru)
        {
                KahanObject kbuff = new KahanObject(0, 0);
+               KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+               
                final int numVals = getNumValues();
                double[] c = result.getDenseBlock();
                
-               for (int k = 0; k < numVals; k++) {
-                       int boff = _ptr[k];
-                       int blen = len(k);
-                       double val = sumValues(k);
+               if( ALLOW_CACHE_CONSCIOUS_ROWSUMS 
+                       && LOW_LEVEL_OPT && numVals > 1 
+                       && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ )
+               {
+                       final int blksz = ColGroupOffset.WRITE_CACHE_BLKSZ/2; 
+                       
+                       //step 1: prepare position and value arrays
+                       
+                       //current pos / values per RLE list
+                       int[] astart = new int[numVals];
+                       int[] apos = skipScan(numVals, rl, astart);
+                       double[] aval = sumAllValues(kplus, kbuff);
+                       
+                       //step 2: cache conscious matrix-vector via horizontal 
scans 
+                       for( int bi=rl; bi<ru; bi+=blksz ) 
+                       {
+                               int bimax = Math.min(bi+blksz, ru);
+                                       
+                               //horizontal segment scan, incl pos maintenance
+                               for (int k = 0; k < numVals; k++) {
+                                       int boff = _ptr[k];
+                                       int blen = len(k);
+                                       double val = aval[k];
+                                       int bix = apos[k];
+                                       int start = astart[k];
+                                       
+                                       //compute partial results, not aligned
+                                       while( bix<blen ) {
+                                               int lstart = _data[boff + bix];
+                                               int llen = _data[boff + bix + 
1];
+                                               int from = Math.max(bi, 
start+lstart);
+                                               int to = 
Math.min(start+lstart+llen,bimax);
+                                               for (int rix=from; rix<to; 
rix++) {
+                                                       kbuff.set(c[2*rix], 
c[2*rix+1]);
+                                                       kplus2.execute2(kbuff, 
val);
+                                                       c[2*rix] = kbuff._sum;
+                                                       c[2*rix+1] = 
kbuff._correction;
+                                               }
+                                               if(start+lstart+llen >= bimax)
+                                                       break;
+                                               start += lstart + llen;
+                                               bix += 2;
+                                       }
                                        
-                       if (val != 0.0) {
-                               Pair<Integer,Integer> tmp = skipScanVal(k, rl);
-                               int bix = tmp.getKey();
-                               int curRunStartOff = tmp.getValue();
-                               int curRunEnd = tmp.getValue();
-                               for ( ; bix<blen && curRunEnd<ru; bix+=2) {
-                                       curRunStartOff = curRunEnd + 
_data[boff+bix];
-                                       curRunEnd = curRunStartOff + 
_data[boff+bix+1];
-                                       for (int rix=curRunStartOff; 
rix<curRunEnd && rix<ru; rix++) {
-                                               kbuff.set(c[2*rix], c[2*rix+1]);
-                                               kplus.execute2(kbuff, val);
-                                               c[2*rix] = kbuff._sum;
-                                               c[2*rix+1] = kbuff._correction;
+                                       apos[k] = bix;  
+                                       astart[k] = start;
+                               }
+                       }
+               }
+               else
+               {
+                       for (int k = 0; k < numVals; k++) {
+                               int boff = _ptr[k];
+                               int blen = len(k);
+                               double val = sumValues(k, kplus, kbuff);
+                                               
+                               if (val != 0.0) {
+                                       Pair<Integer,Integer> tmp = 
skipScanVal(k, rl);
+                                       int bix = tmp.getKey();
+                                       int curRunStartOff = tmp.getValue();
+                                       int curRunEnd = tmp.getValue();
+                                       for ( ; bix<blen && curRunEnd<ru; 
bix+=2) {
+                                               curRunStartOff = curRunEnd + 
_data[boff+bix];
+                                               curRunEnd = curRunStartOff + 
_data[boff+bix+1];
+                                               for (int rix=curRunStartOff; 
rix<curRunEnd && rix<ru; rix++) {
+                                                       kbuff.set(c[2*rix], 
c[2*rix+1]);
+                                                       kplus2.execute2(kbuff, 
val);
+                                                       c[2*rix] = kbuff._sum;
+                                                       c[2*rix+1] = 
kbuff._correction;
+                                               }
                                        }
                                }
                        }
                }
        }
 
-       private void computeColSums(MatrixBlock result, KahanFunction kplus)
+       @Override
+       protected final void computeColSums(MatrixBlock result, KahanFunction 
kplus)
        {
                KahanObject kbuff = new KahanObject(0, 0);
                
@@ -561,7 +586,8 @@ public class ColGroupRLE extends ColGroupBitmap
                }
        }
 
-       private void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, 
int ru)
+       @Override
+       protected final void computeRowMxx(MatrixBlock result, Builtin builtin, 
int rl, int ru)
        {
                //NOTE: zeros handled once for all column groups outside
                final int numVals = getNumValues();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
index 9d06bf8..6445c52 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock.Type;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysml.runtime.util.SortUtils;
@@ -53,7 +54,7 @@ public class ColGroupUncompressed extends ColGroup
        private MatrixBlock _data;
 
        public ColGroupUncompressed() {
-               super(CompressionType.UNCOMPRESSED, (int[])null, -1);
+               super((int[])null, -1);
        }
        
        /**
@@ -71,7 +72,7 @@ public class ColGroupUncompressed extends ColGroup
        public ColGroupUncompressed(List<Integer> colIndicesList, MatrixBlock 
rawblock) 
                throws DMLRuntimeException 
        {
-               super(CompressionType.UNCOMPRESSED, colIndicesList, 
+               super(colIndicesList, 
                                CompressedMatrixBlock.TRANSPOSE_INPUT ? 
                                rawblock.getNumColumns() : 
rawblock.getNumRows());
 
@@ -97,7 +98,7 @@ public class ColGroupUncompressed extends ColGroup
                        return;
                }
                
-               // dense implementation for dense and sparse matrices to avoid 
linear search
+               //dense implementation for dense and sparse matrices to avoid 
linear search
                int m = numRows;
                int n = _colIndexes.length;
                for( int i = 0; i < m; i++) {
@@ -109,6 +110,11 @@ public class ColGroupUncompressed extends ColGroup
                        }
                }
                _data.examSparsity();
+               
+               //convert sparse MCSR to read-optimized CSR representation
+               if( _data.isInSparseFormat() ) {
+                       _data = new MatrixBlock(_data, Type.CSR, false);
+               }
        }
 
        /**
@@ -121,8 +127,7 @@ public class ColGroupUncompressed extends ColGroup
         */
        public ColGroupUncompressed(ArrayList<ColGroup> groupsToDecompress) 
        {
-               super(CompressionType.UNCOMPRESSED, 
-                               mergeColIndices(groupsToDecompress),
+               super(mergeColIndices(groupsToDecompress),
                                groupsToDecompress.get(0)._numRows);
 
                // Invert the list of column indices
@@ -152,10 +157,14 @@ public class ColGroupUncompressed extends ColGroup
         */
        public ColGroupUncompressed(int[] colIndices, int numRows, MatrixBlock 
data) 
        {
-               super(CompressionType.UNCOMPRESSED, colIndices, numRows);
+               super(colIndices, numRows);
                _data = data;
        }
 
+       @Override
+       public CompressionType getCompType() {
+               return CompressionType.UNCOMPRESSED;
+       }
 
        /**
         * Access for superclass
@@ -276,6 +285,23 @@ public class ColGroupUncompressed extends ColGroup
                LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru);   
        }
        
+       public void rightMultByVector(MatrixBlock vector, MatrixBlock result, 
int k)
+                       throws DMLRuntimeException 
+       {
+               // Pull out the relevant rows of the vector
+               int clen = _colIndexes.length;
+               
+               MatrixBlock shortVector = new MatrixBlock(clen, 1, false);
+               shortVector.allocateDenseBlock();
+               double[] b = shortVector.getDenseBlock();
+               for (int colIx = 0; colIx < clen; colIx++)
+                       b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0);
+               shortVector.recomputeNonZeros();
+               
+               // Multiply the selected columns by the appropriate parts of 
the vector
+               LibMatrixMult.matrixMult(_data, shortVector, result, k);        
+       }
+       
        @Override
        public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
                        throws DMLRuntimeException 
@@ -377,8 +403,7 @@ public class ColGroupUncompressed extends ColGroup
        }
        
        @Override
-       protected void countNonZerosPerRow(int[] rnnz, int rl, int ru)
-       {
+       protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
                for( int i=rl; i<ru; i++ )
                        rnnz[i-rl] += _data.recomputeNonZeros(i, i, 0, 
_data.getNumColumns()-1);
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
new file mode 100644
index 0000000..b3b5e80
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+
+/**
+ * Base class for column groups encoded with value dictionary.
+ * 
+ */
+public abstract class ColGroupValue extends ColGroup 
+{      
+       private static final long serialVersionUID = 3786247536054353658L;
+               
+       public static boolean LOW_LEVEL_OPT = true;     
+       
+       //sorting of values by physical length helps by 10-20%, especially for 
serial, while
+       //slight performance decrease for parallel incl multi-threaded, hence 
not applied for
+       //distributed operations (also because compression time + garbage 
collection increases)
+       public static final boolean SORT_VALUES_BY_LENGTH = true; 
+               
+       
+       /** Distinct values associated with individual bitmaps. */
+       protected double[] _values; //linearized <numcol vals> <numcol vals>
+       
+       public ColGroupValue() {
+               super((int[]) null, -1);
+       }
+       
+       /**
+        * Main constructor. Stores the headers for the individual bitmaps.
+        * 
+        * @param colIndices
+        *            indices (within the block) of the columns included in this
+        *            column
+        * @param numRows
+        *            total number of rows in the parent block
+        * @param ubm
+        *            Uncompressed bitmap representation of the block
+        */
+       public ColGroupValue(int[] colIndices, int numRows, UncompressedBitmap 
ubm) 
+       {
+               super(colIndices, numRows);
+
+               // sort values by frequency, if requested 
+               if( LOW_LEVEL_OPT && SORT_VALUES_BY_LENGTH 
+                               && numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) {
+                       ubm.sortValuesByFrequency();
+               }
+
+               // extract and store distinct values (bitmaps handled by 
subclasses)
+               _values = ubm.getValues();
+       }
+
+       /**
+        * Constructor for subclass methods that need to create shallow copies
+        * 
+        * @param colIndices
+        *            raw column index information
+        * @param numRows
+        *            number of rows in the block
+        * @param values
+        *            set of distinct values for the block (associated bitmaps 
are
+        *            kept in the subclass)
+        */
+       protected ColGroupValue(int[] colIndices, int numRows, double[] values) 
{
+               super(colIndices, numRows);
+               _values = values;
+       }
+       
+       @Override
+       public long estimateInMemorySize() {
+               long size = super.estimateInMemorySize();
+               
+               // adding the size of values
+               size += 8; //array reference
+               if (_values != null) {
+                       size += 32 + _values.length * 8; //values
+               }
+       
+               return size;
+       }
+
+       /**
+        * Obtain number of distrinct sets of values associated with the 
bitmaps in this column group.
+        * 
+        * @return the number of distinct sets of values associated with the 
bitmaps
+        *         in this column group
+        */
+       public int getNumValues() {
+               return _values.length / _colIndexes.length;
+       }
+
+       public double[] getValues() {
+               return _values;
+       }
+       
+       protected int containsAllZeroValue() {
+               int numVals = getNumValues();
+               int numCols = getNumCols();
+               for( int i=0, off=0; i<numVals; i++, off+=numCols ) {
+                       boolean allZeros = true;
+                       for( int j=0; j<numCols; j++ )
+                               allZeros &= (_values[off+j] == 0);
+                       if( allZeros )
+                               return i;
+               }
+               return -1;
+       }
+       
+       protected final double sumValues(int valIx) {
+               final int numCols = getNumCols();
+               final int valOff = valIx * numCols;
+               double val = 0.0;
+               for( int i = 0; i < numCols; i++ ) {
+                       val += _values[valOff+i];
+               }
+               
+               return val;
+       }
+       
+       protected final double sumValues(int valIx, KahanFunction kplus, 
KahanObject kbuff) {
+               final int numCols = getNumCols();
+               final int valOff = valIx * numCols;
+               kbuff.set(0, 0);
+               for( int i = 0; i < numCols; i++ ) {
+                       kplus.execute2(kbuff, _values[valOff+i]);
+               }
+               
+               return kbuff._sum;
+       }
+       
+       protected final double[] sumAllValues(KahanFunction kplus, KahanObject 
kbuff) {
+               //quick path: sum 
+               if( getNumCols()==1 && kplus instanceof KahanPlus )
+                       return _values; //shallow copy of values
+               
+               //pre-aggregate value tuple 
+               final int numVals = getNumValues();
+               double[] ret = new double[numVals];
+               for( int k=0; k<numVals; k++ )
+                       ret[k] = sumValues(k, kplus, kbuff);
+               
+               return ret;
+       }
+       
+       protected final double sumValues(int valIx, double[] b) {
+               final int numCols = getNumCols();
+               final int valOff = valIx * numCols;
+               double val = 0;
+               for( int i = 0; i < numCols; i++ ) {
+                       val += _values[valOff+i] * b[i];
+               }
+               
+               return val;
+       }
+
+       protected final double[] preaggValues(int numVals, double[] b) {
+               double[] ret = new double[numVals];
+               for( int k = 0; k < numVals; k++ )
+                       ret[k] = sumValues(k, b);
+               
+               return ret;
+       }
+       
+       /**
+        * NOTE: Shared across OLE/RLE/DDC because value-only computation. 
+        * 
+        * @param result output matrix block
+        * @param builtin function object
+        * @param zeros indicator if column group contains zero values
+        */
+       protected void computeMxx(MatrixBlock result, Builtin builtin, boolean 
zeros) 
+       {
+               //init and 0-value handling
+               double val = Double.MAX_VALUE * 
((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
+               if( zeros )
+                       val = builtin.execute2(val, 0);
+               
+               //iterate over all values only
+               final int numVals = getNumValues();
+               final int numCols = getNumCols();               
+               for (int k = 0; k < numVals; k++)
+                       for( int j=0, valOff = k*numCols; j<numCols; j++ )
+                               val = builtin.execute2(val, _values[ valOff+j 
]);
+               
+               //compute new partial aggregate
+               val = builtin.execute2(val, result.quickGetValue(0, 0));
+               result.quickSetValue(0, 0, val);
+       }
+       
+       /**
+        * NOTE: Shared across OLE/RLE/DDC because value-only computation. 
+        * 
+        * @param result output matrix block
+        * @param builtin function object
+        * @param zeros indicator if column group contains zero values
+        */
+       protected void computeColMxx(MatrixBlock result, Builtin builtin, 
boolean zeros)
+       {
+               final int numVals = getNumValues();
+               final int numCols = getNumCols();
+               
+               //init and 0-value handling
+               double[] vals = new double[numCols];
+               Arrays.fill(vals, Double.MAX_VALUE * 
((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1));
+               if( zeros ) {
+                       for( int j = 0; j < numCols; j++ )
+                               vals[j] = builtin.execute2(vals[j], 0);         
+               }
+               
+               //iterate over all values only
+               for (int k = 0; k < numVals; k++) 
+                       for( int j=0, valOff=k*numCols; j<numCols; j++ )
+                               vals[j] = builtin.execute2(vals[j], _values[ 
valOff+j ]);
+               
+               //copy results to output
+               for( int j=0; j<numCols; j++ )
+                       result.quickSetValue(0, _colIndexes[j], vals[j]);
+       }
+       
+       /**
+        * Method for use by subclasses. Applies a scalar operation to the value
+        * metadata stored in the superclass.
+        * 
+        * @param op
+        *            scalar operation to perform
+        * @return transformed copy of value metadata for this column group
+        * @throws DMLRuntimeException if DMLRuntimeException occurs
+        */
+       protected double[] applyScalarOp(ScalarOperator op)
+               throws DMLRuntimeException 
+       {
+               //scan over linearized values
+               double[] ret = new double[_values.length];
+               for (int i = 0; i < _values.length; i++) {
+                       ret[i] = op.executeScalar(_values[i]);
+               }
+
+               return ret;
+       }
+
+       protected double[] applyScalarOp(ScalarOperator op, double newVal, int 
numCols)
+               throws DMLRuntimeException 
+       {
+               //scan over linearized values
+               double[] ret = new double[_values.length + numCols];
+               for( int i = 0; i < _values.length; i++ ) {
+                       ret[i] = op.executeScalar(_values[i]);
+               }
+               
+               //add new value to the end
+               Arrays.fill(ret, _values.length, _values.length+numCols, 
newVal);
+               
+               return ret;
+       }
+       
+       @Override
+       public void unaryAggregateOperations(AggregateUnaryOperator op, 
MatrixBlock result) 
+               throws DMLRuntimeException 
+       {
+               unaryAggregateOperations(op, result, 0, getNumRows());
+       }
+       
+       /**
+        * 
+        * @param op aggregation operator
+        * @param result output matrix block
+        * @param rl row lower index, inclusive
+        * @param ru row upper index, exclusive
+        * @throws DMLRuntimeException
+        */
+       public abstract void unaryAggregateOperations(AggregateUnaryOperator 
op, MatrixBlock result, int rl, int ru)
+               throws DMLRuntimeException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 48ebcc5..84c4812 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -46,6 +46,7 @@ import org.apache.sysml.lops.MMTSJ.MMTSJType;
 import org.apache.sysml.lops.MapMultChain.ChainType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.ColGroup.CompressionType;
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory;
@@ -56,12 +57,14 @@ import 
org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
 import org.apache.sysml.runtime.functionobjects.Multiply;
 import org.apache.sysml.runtime.functionobjects.ReduceAll;
 import org.apache.sysml.runtime.functionobjects.ReduceCol;
 import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
 import org.apache.sysml.runtime.matrix.data.CTableMap;
 import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
@@ -98,7 +101,9 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        public static final boolean MATERIALIZE_ZEROS = false;
        public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
        public static final boolean INVESTIGATE_ESTIMATES = false;
-       private static final boolean LDEBUG = false; //local debug flag
+       public static boolean ALLOW_DDC_ENCODING = true;
+       private static final boolean LDEBUG = true; //local debug flag
+       private static final Level LDEBUG_LEVEL = Level.DEBUG; //DEBUG/TRACE 
for details
        
        private static final Log LOG = 
LogFactory.getLog(CompressedMatrixBlock.class.getName());
        
@@ -106,7 +111,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                // for internal debugging only
                if( LDEBUG ) {
                        Logger.getLogger("org.apache.sysml.runtime.compress")
-                                 .setLevel((Level) Level.DEBUG);
+                                 .setLevel((Level) LDEBUG_LEVEL);
                }       
        }
        
@@ -231,7 +236,6 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                final int numRows = getNumRows();
                final int numCols = getNumColumns();
                final boolean sparse = isInSparseFormat();
-               final double sp = OptimizerUtils.getSparsity(numRows, numCols, 
getNonZeros());
                MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) 
:
                        LibMatrixReorg.transpose(this, new MatrixBlock(numCols, 
numRows, sparse), k);
                
@@ -239,45 +243,50 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                CompressedSizeEstimator bitmapSizeEstimator = 
                                SizeEstimatorFactory.getSizeEstimator(rawblock, 
numRows);
 
-               // The current implementation of this method is written for 
correctness,
-               // not for performance or for minimal use of temporary space.
-
-               // We start with a full set of columns.
-               HashSet<Integer> remainingCols = new HashSet<Integer>();
-               for (int i = 0; i < numCols; i++)
-                       remainingCols.add(i);
-
                // PHASE 1: Classify columns by compression type
-               // We start by determining which columns are amenable to bitmap 
compression
-               double uncompressedColumnSize = getUncompressedSize(numRows, 1, 
sp);
-
-               // information about the bitmap amenable columns
-               List<Integer> bitmapCols = new ArrayList<Integer>();
-               List<Integer> uncompressedCols = new ArrayList<Integer>();
-               List<Integer> colsCards = new ArrayList<Integer>();
-               List<Long> compressedSizes = new ArrayList<Long>();
-               HashMap<Integer, Double> compressionRatios = new 
HashMap<Integer, Double>();
+               // We start by determining which columns are amenable to 
compression
+               List<Integer> colsC = new ArrayList<Integer>();
+               List<Integer> colsUC = new ArrayList<Integer>();
+               HashMap<Integer, Double> compRatios = new HashMap<Integer, 
Double>();
                
-               // Classify columns according to ration (size uncompressed / 
size compressed), 
+               // Classify columns according to ratio (size uncompressed / 
size compressed), 
                // where a column is compressible if ratio > 1.
                CompressedSizeInfo[] sizeInfos = (k > 1) ?
                                computeCompressedSizeInfos(bitmapSizeEstimator, 
numCols, k) : 
-                               computeCompressedSizeInfos(bitmapSizeEstimator, 
numCols);               
+                               computeCompressedSizeInfos(bitmapSizeEstimator, 
numCols);       
+               long nnzUC = 0;         
                for (int col = 0; col < numCols; col++)  {      
-                       long compressedSize = sizeInfos[col].getMinSize();
-                       double compRatio = uncompressedColumnSize / 
compressedSize;                     
-                       if (compRatio > 1) {
-                               bitmapCols.add(col);
-                               compressionRatios.put(col, compRatio);
-                               
colsCards.add(sizeInfos[col].getEstCarinality());
-                               compressedSizes.add(compressedSize);
+                       double uncompSize = getUncompressedSize(numRows, 1, 
+                               OptimizerUtils.getSparsity(numRows, 1, 
sizeInfos[col].getEstNnz()));
+                       double compRatio = uncompSize / 
sizeInfos[col].getMinSize();                    
+                       if( compRatio > 1 ) {
+                               colsC.add(col);
+                               compRatios.put(col, compRatio);
+                       }
+                       else {
+                               colsUC.add(col); 
+                               nnzUC += sizeInfos[col].getEstNnz();
                        }
-                       else
-                               uncompressedCols.add(col);
                }
-
-               _stats.timePhase1 = time.stop();
+               
+               // correction of column classification (reevaluate dense 
estimates if necessary)
+               boolean sparseUC = 
MatrixBlock.evalSparseFormatInMemory(numRows, colsUC.size(), nnzUC);
+               if( !sparseUC && !colsUC.isEmpty() ) {
+                       for( int i=0; i<colsUC.size(); i++ ) {
+                               int col = colsUC.get(i);
+                               double uncompSize = 
getUncompressedSize(numRows, 1, 1.0);
+                               double compRatio = uncompSize / 
sizeInfos[col].getMinSize();                    
+                               if( compRatio > 1 ) {
+                                       colsC.add(col);
+                                       colsUC.remove(i); i--;
+                                       compRatios.put(col, compRatio);
+                                       nnzUC -= sizeInfos[col].getEstNnz();
+                               }
+                       }
+               }
+               
                if( LOG.isDebugEnabled() ) {
+                       _stats.timePhase1 = time.stop();
                        LOG.debug("Compression statistics:");
                        LOG.debug("--compression phase 1: "+_stats.timePhase1);
                }
@@ -285,26 +294,28 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                // PHASE 2: Grouping columns
                // Divide the bitmap columns into column groups.
                List<int[]> bitmapColGrps = 
PlanningCoCoder.findCocodesByPartitioning(
-                               bitmapSizeEstimator, bitmapCols, colsCards, 
compressedSizes, numRows, 
-                               isInSparseFormat() ? sp : 1, k);
+                               bitmapSizeEstimator, colsC, sizeInfos, numRows, 
k);
 
-               _stats.timePhase2 = time.stop();
-               if( LOG.isDebugEnabled() )
+               if( LOG.isDebugEnabled() ) {
+                       _stats.timePhase2 = time.stop();
                        LOG.debug("--compression phase 2: "+_stats.timePhase2);
-               
+               }
+                       
                if( INVESTIGATE_ESTIMATES ) {
                        double est = 0;
                        for( int[] groupIndices : bitmapColGrps )
                                est += 
bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize();
-                       est += uncompressedCols.size() * uncompressedColumnSize;
+                       est += MatrixBlock.estimateSizeInMemory(numRows, 
colsUC.size(), 
+                                       OptimizerUtils.getSparsity(numRows, 
colsUC.size(), nnzUC));
                        _stats.estSize = est;
                }
                
                // PHASE 3: Compress and correct sample-based decisions
                ColGroup[] colGroups = (k > 1) ?
-                               compressColGroups(rawblock, 
bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps, k) : 
-                               compressColGroups(rawblock, 
bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps);        
+                               compressColGroups(rawblock, 
bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty(), k) : 
+                               compressColGroups(rawblock, 
bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty());     
    
                allocateColGroupList();
+               HashSet<Integer> remainingCols = seq(0, numCols-1, 1);
                for( int j=0; j<colGroups.length; j++ ) {
                        if( colGroups[j] != null ) {
                                for( int col : colGroups[j].getColIndices() )
@@ -313,10 +324,11 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        }
                }
                
-               _stats.timePhase3 = time.stop();
-               if( LOG.isDebugEnabled() )
+               if( LOG.isDebugEnabled() ) {
+                       _stats.timePhase3 = time.stop();
                        LOG.debug("--compression phase 3: "+_stats.timePhase3);
-               
+               }
+                       
                // Phase 4: Cleanup
                // The remaining columns are stored uncompressed as one big 
column group
                if( !remainingCols.isEmpty() ) {
@@ -332,10 +344,15 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                rawblock.cleanupBlock(true, true);
                this.cleanupBlock(true, true);
                
-               _stats.timePhase4 = time.stop();
                if( LOG.isDebugEnabled() ) {
+                       _stats.timePhase4 = time.stop();
+                       int[] counts = getColGroupCounts(_colGroups);
                        LOG.debug("--compression phase 4: "+_stats.timePhase4);
                        LOG.debug("--num col groups: "+_colGroups.size());
+                       LOG.debug("--col groups types (OLE,RLE,DDC1,DDC2,UC): "
+                                       
+counts[2]+","+counts[1]+","+counts[3]+","+counts[4]+","+counts[0]);
+                       LOG.debug("--col groups sizes (OLE,RLE,DDC1,DDC2,UC): "
+                                       
+counts[7]+","+counts[6]+","+counts[8]+","+counts[9]+","+counts[5]);
                        LOG.debug("--compressed size: "+_stats.size);
                        LOG.debug("--compression ratio: "+_stats.ratio);
                }
@@ -345,6 +362,22 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                return _stats;
        }
 
+       /**
+        * Get array of counts regarding col group types. The position
+        * corresponds with the enum ordinal. 
+        * 
+        * @param colgroups list of column groups
+        * @return counts 
+        */
+       private static int[] getColGroupCounts(ArrayList<ColGroup> colgroups) {
+               int[] ret = new int[10]; //5 x count, 5 x num_columns
+               for( ColGroup c : colgroups ) {
+                       ret[c.getCompType().ordinal()] ++;
+                       ret[5+c.getCompType().ordinal()] += c.getNumCols();
+               }
+               return ret;
+       }
+       
        private static CompressedSizeInfo[] 
computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) {
                CompressedSizeInfo[] ret = new CompressedSizeInfo[clen];
                for( int col=0; col<clen; col++ )
@@ -372,23 +405,23 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                }
        }
 
-       private static ColGroup[] compressColGroups(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
double sp, List<int[]> groups)
+       private static ColGroup[] compressColGroups(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
List<int[]> groups, boolean denseEst)
        {
                ColGroup[] ret = new ColGroup[groups.size()];
                for( int i=0; i<groups.size(); i++ )
-                       ret[i] = compressColGroup(in, estim, compRatios, rlen, 
sp, groups.get(i));
+                       ret[i] = compressColGroup(in, estim, compRatios, rlen, 
groups.get(i), denseEst);
                
                return ret;
        }
 
-       private static ColGroup[] compressColGroups(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
double sp, List<int[]> groups, int k) 
+       private static ColGroup[] compressColGroups(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
List<int[]> groups, boolean denseEst, int k) 
                throws DMLRuntimeException
        {
                try {
                        ExecutorService pool = Executors.newFixedThreadPool( k 
);
                        ArrayList<CompressTask> tasks = new 
ArrayList<CompressTask>();
                        for( int[] colIndexes : groups )
-                               tasks.add(new CompressTask(in, estim, 
compRatios, rlen, sp, colIndexes));
+                               tasks.add(new CompressTask(in, estim, 
compRatios, rlen, colIndexes, denseEst));
                        List<Future<ColGroup>> rtask = pool.invokeAll(tasks);   
                        ArrayList<ColGroup> ret = new ArrayList<ColGroup>();
                        for( Future<ColGroup> lrtask : rtask )
@@ -401,7 +434,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                }
        }
 
-       private static ColGroup compressColGroup(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
double sp, int[] colIndexes) 
+       private static ColGroup compressColGroup(MatrixBlock in, 
CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, 
int[] colIndexes, boolean denseEst) 
        {
                int[] allGroupIndices = null;
                int allColsCount = colIndexes.length;
@@ -416,12 +449,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        //exact big list and observe compression ratio
                        ubm = BitmapEncoder.extractBitmap(colIndexes, in); 
                        sizeInfo = estim.estimateCompressedColGroupSize(ubm);   
-                       double compRatio = getUncompressedSize(rlen, 
colIndexes.length, sp) / sizeInfo.getMinSize();
-                       
+                       double sp2 = denseEst ? 1.0 : 
OptimizerUtils.getSparsity(rlen, 1, ubm.getNumOffsets());
+                       double compRatio = getUncompressedSize(rlen, 
colIndexes.length, sp2) / sizeInfo.getMinSize();
+               
                        if( compRatio > 1 ) {
                                break; // we have a good group
                        } 
-                       
+
                        // modify the group
                        if (compRatioPQ == null) {
                                // first modification
@@ -454,9 +488,17 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                //create compressed column group
                long rleSize = sizeInfo.getRLESize();
                long oleSize = sizeInfo.getOLESize();
-               if( rleSize < oleSize )
+               long ddcSize = sizeInfo.getDDCSize();
+               
+               if( ALLOW_DDC_ENCODING && ddcSize < rleSize && ddcSize < 
oleSize ) {
+                       if( ubm.getNumValues()<=255 )
+                               return new ColGroupDDC1(colIndexes, rlen, ubm);
+                       else
+                               return new ColGroupDDC2(colIndexes, rlen, ubm); 
+               }
+               else if( rleSize < oleSize )
                        return new ColGroupRLE(colIndexes, rlen, ubm);
-               else
+               else 
                        return new ColGroupOLE(colIndexes, rlen, ubm);
        }
        
@@ -469,10 +511,11 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
         * @return estimate of uncompressed size of column group
         */
        private static double getUncompressedSize(int rlen, int clen, double 
sparsity) {
-               //we estimate the uncompressed size as 8 * nnz in order to 
cover both
-               //sparse and dense with moderate underestimation (which is 
conservative as 
-               //it is biased towards uncompressed columns)
-               return 8 * rlen * clen * sparsity;
+               //we estimate the uncompressed size as the minimum of dense 
representation
+               //and representation in csr, which moderately overestimates 
sparse representations
+               //of single columns but helps avoid anomalies with sparse 
columns that are
+               //eventually represented in dense
+               return Math.min(8d * rlen * clen, 4d * rlen + 12d * rlen * clen 
* sparsity);
        }
 
        /**
@@ -587,8 +630,8 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        }
 
        private static class CompressedColumn implements 
Comparable<CompressedColumn> {
-               int colIx;
-               double compRatio;
+               final int colIx;
+               final double compRatio;
 
                public CompressedColumn(int colIx, double compRatio) {
                        this.colIx = colIx;
@@ -613,6 +656,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                public CompressionStatistics() {
                        //do nothing
                }
+               
+               public CompressionStatistics(double t1, double t2, double t3, 
double t4){
+                       timePhase1 = t1;
+                       timePhase2 = t2;
+                       timePhase3 = t3;
+                       timePhase4 = t4;
+               }
        } 
 
        @Override
@@ -681,6 +731,10 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                        grp = new ColGroupOLE(); break;
                                case RLE_BITMAP:
                                        grp = new ColGroupRLE(); break;
+                               case DDC1:
+                                       grp = new ColGroupDDC1(); break;
+                               case DDC2:
+                                       grp = new ColGroupDDC2(); break;        
                        }
                        
                        //deserialize and add column group
@@ -1040,11 +1094,22 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                
                                //aggregate partial results
                                if( op.indexFn instanceof ReduceAll ) {
-                                       double val = ret.quickGetValue(0, 0);
-                                       for( Future<MatrixBlock> rtask : rtasks 
)
-                                               val = 
op.aggOp.increOp.fn.execute(val, 
-                                                               
rtask.get().quickGetValue(0, 0));
-                                       ret.quickSetValue(0, 0, val);
+                                       if( op.aggOp.increOp.fn instanceof 
KahanFunction ) {
+                                               KahanObject kbuff = new 
KahanObject(ret.quickGetValue(0, 0), 0);
+                                               for( Future<MatrixBlock> rtask 
: rtasks ) {
+                                                       double tmp = 
rtask.get().quickGetValue(0, 0);
+                                                       ((KahanFunction) 
op.aggOp.increOp.fn).execute2(kbuff, tmp);
+                                               }
+                                               ret.quickSetValue(0, 0, 
kbuff._sum);
+                                       }       
+                                       else {
+                                               double val = 
ret.quickGetValue(0, 0);
+                                               for( Future<MatrixBlock> rtask 
: rtasks ) {
+                                                       double tmp = 
rtask.get().quickGetValue(0, 0);
+                                                       val = 
op.aggOp.increOp.fn.execute(val, tmp);
+                                               }
+                                               ret.quickSetValue(0, 0, val);
+                                       }
                                }               
                        }
                        catch(Exception ex) {
@@ -1058,9 +1123,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                        grp.unaryAggregateOperations(op, ret);
                        
                        //process OLE/RLE column groups
-                       for (ColGroup grp : _colGroups)
-                               if( !(grp instanceof ColGroupUncompressed) )
-                                       grp.unaryAggregateOperations(op, ret);
+                       aggregateUnaryOperations(op, _colGroups, ret, 0, rlen);
                }
                
                //special handling zeros for rowmins/rowmax
@@ -1089,6 +1152,41 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        }
        
        @Override
+       public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op,
+                       MatrixValue result, int blockingFactorRow, int 
blockingFactorCol,
+                       MatrixIndexes indexesIn) throws DMLRuntimeException {
+               return aggregateUnaryOperations(op, result, 
+                               blockingFactorRow, blockingFactorCol, 
indexesIn, false);
+       }
+       
+       private static void aggregateUnaryOperations(AggregateUnaryOperator op, 
+                       ArrayList<ColGroup> groups, MatrixBlock ret, int rl, 
int ru) throws DMLRuntimeException 
+       {
+               boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT 
+                               && op.indexFn instanceof ReduceCol && 
op.aggOp.increOp.fn instanceof KahanPlus //rowSums
+                               && ColGroupOffset.ALLOW_CACHE_CONSCIOUS_ROWSUMS
+                               && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ/2;
+                       
+               //process cache-conscious DDC1 groups (adds to output)
+               if( cacheDDC1 ) {
+                       ArrayList<ColGroupDDC1> tmp = new 
ArrayList<ColGroupDDC1>();
+                       for( ColGroup grp : groups )
+                               if( grp instanceof ColGroupDDC1 )
+                                       tmp.add((ColGroupDDC1)grp);
+                       if( !tmp.isEmpty() )
+                               ColGroupDDC1.computeRowSums(tmp.toArray(new 
ColGroupDDC1[0]), ret,
+                                               
KahanPlus.getKahanPlusFnObject(), rl, ru);
+               }
+                       
+               //process remaining groups (adds to output)
+               //note: UC group never passed into this function
+               for( ColGroup grp : groups )
+                       if( !(grp instanceof ColGroupUncompressed) 
+                               && !(cacheDDC1 && grp instanceof ColGroupDDC1) )
+                               
((ColGroupValue)grp).unaryAggregateOperations(op, ret, rl, ru);
+       }
+       
+       @Override
        public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, 
MMTSJType tstype) 
                throws DMLRuntimeException 
        {
@@ -1204,12 +1302,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                result.allocateDenseBlock();
 
                // delegate matrix-vector operation to each column group
-               for( ColGroup grp : _colGroups )
-                       if( grp instanceof ColGroupUncompressed ) //overwrites 
output
-                               grp.rightMultByVector(vector, result, 0, 
result.getNumRows());
-               for( ColGroup grp : _colGroups )
-                       if( !(grp instanceof ColGroupUncompressed) ) //adds to 
output
-                               grp.rightMultByVector(vector, result, 0, 
result.getNumRows());
+               rightMultByVector(_colGroups, vector, result, true, 0, 
result.getNumRows());
                
                // post-processing
                result.recomputeNonZeros();
@@ -1231,6 +1324,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
                //multi-threaded execution of all groups
                try {
+                       ColGroupUncompressed uc = getUncompressedColGroup();
+                       
+                       //compute uncompressed column group in parallel 
+                       if( uc != null )
+                               uc.rightMultByVector(vector, result, k);        
                                
+                       
+                       //compute remaining compressed column groups in parallel
                        ExecutorService pool = Executors.newFixedThreadPool( k 
);
                        int rlen = getNumRows();
                        int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
@@ -1239,15 +1339,48 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        ArrayList<RightMatrixMultTask> tasks = new 
ArrayList<RightMatrixMultTask>();
                        for( int i=0; i<k & i*blklen<getNumRows(); i++ )
                                tasks.add(new RightMatrixMultTask(_colGroups, 
vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));
-                       pool.invokeAll(tasks);  
+                       List<Future<Long>> ret = pool.invokeAll(tasks); 
                        pool.shutdown();
+                       
+                       //error handling and nnz aggregation
+                       long lnnz = 0;
+                       for( Future<Long> tmp : ret )
+                               lnnz += tmp.get(); 
+                       result.setNonZeros(lnnz);
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
+       }
+       
+       private static void rightMultByVector(ArrayList<ColGroup> groups, 
MatrixBlock vect, MatrixBlock ret, boolean inclUC, int rl, int ru) 
+               throws DMLRuntimeException 
+       {
+               boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT 
+                       && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ;
                
-               // post-processing
-               result.recomputeNonZeros();
+               // process uncompressed column group (overwrites output)
+               if( inclUC ) {
+                       for( ColGroup grp : groups )
+                               if( grp instanceof ColGroupUncompressed )
+                                       grp.rightMultByVector(vect, ret, rl, 
ru);
+               }
+               
+               //process cache-conscious DDC1 groups (adds to output)
+               if( cacheDDC1 ) {
+                       ArrayList<ColGroupDDC1> tmp = new 
ArrayList<ColGroupDDC1>();
+                       for( ColGroup grp : groups )
+                               if( grp instanceof ColGroupDDC1 )
+                                       tmp.add((ColGroupDDC1)grp);
+                       if( !tmp.isEmpty() )
+                               ColGroupDDC1.rightMultByVector(tmp.toArray(new 
ColGroupDDC1[0]), vect, ret, rl, ru);
+               }
+               
+               //process remaining groups (adds to output)
+               for( ColGroup grp : groups )
+                       if( !(grp instanceof ColGroupUncompressed) 
+                               && !(cacheDDC1 && grp instanceof ColGroupDDC1) )
+                               grp.rightMultByVector(vect, ret, rl, ru);
        }
        
        /**
@@ -1299,11 +1432,9 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
         * @param k number of threads
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       private static void leftMultByVectorTranspose(List<ColGroup> 
colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) 
+       private void leftMultByVectorTranspose(List<ColGroup> 
colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) 
                throws DMLRuntimeException 
        {
-               int kuc = Math.max(1, k - colGroups.size() + 1);
-               
                //transpose vector if required
                MatrixBlock rowVector = vector;
                if (doTranspose) {
@@ -1317,12 +1448,21 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
                //multi-threaded execution
                try {
-                       ExecutorService pool = Executors.newFixedThreadPool( 
Math.min(colGroups.size(), k) );
+                       //compute uncompressed column group in parallel 
+                       ColGroupUncompressed uc = getUncompressedColGroup();
+                       if( uc != null )
+                               uc.leftMultByRowVector(vector, result, k);      
                                
+                       
+                       //compute remaining compressed column groups in parallel
+                       ExecutorService pool = Executors.newFixedThreadPool( 
Math.min(colGroups.size()-((uc!=null)?1:0), k) );
                        ArrayList<LeftMatrixMultTask> tasks = new 
ArrayList<LeftMatrixMultTask>();
                        for( ColGroup grp : colGroups )
-                               tasks.add(new LeftMatrixMultTask(grp, 
rowVector, result, kuc));
-                       pool.invokeAll(tasks);  
+                               if( !(grp instanceof ColGroupUncompressed) )
+                                       tasks.add(new LeftMatrixMultTask(grp, 
rowVector, result));
+                       List<Future<Object>> ret = pool.invokeAll(tasks);       
                        pool.shutdown();
+                       for( Future<Object> tmp : ret )
+                               tmp.get(); //error handling
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
@@ -1405,37 +1545,32 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
        private static class LeftMatrixMultTask implements Callable<Object> 
        {
-               private ColGroup _group = null;
-               private MatrixBlock _vect = null;
-               private MatrixBlock _ret = null;
-               private int _kuc = 1;
+               private final ColGroup _group;
+               private final MatrixBlock _vect;
+               private final MatrixBlock _ret;
                
-               protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, 
MatrixBlock ret, int kuc)  {
+               protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, 
MatrixBlock ret)  {
                        _group = group;
                        _vect = vect;
                        _ret = ret;
-                       _kuc = kuc;
                }
                
                @Override
                public Object call() throws DMLRuntimeException 
                {
                        // delegate matrix-vector operation to each column group
-                       if( _group instanceof ColGroupUncompressed && _kuc >1 
&& ColGroupBitmap.LOW_LEVEL_OPT )
-                               
((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc);
-                       else
-                               _group.leftMultByRowVector(_vect, _ret);
+                       _group.leftMultByRowVector(_vect, _ret);
                        return null;
                }
        }
 
-       private static class RightMatrixMultTask implements Callable<Object> 
+       private static class RightMatrixMultTask implements Callable<Long> 
        {
-               private ArrayList<ColGroup> _groups = null;
-               private MatrixBlock _vect = null;
-               private MatrixBlock _ret = null;
-               private int _rl = -1;
-               private int _ru = -1;
+               private final ArrayList<ColGroup> _groups;
+               private final MatrixBlock _vect;
+               private final MatrixBlock _ret;
+               private final int _rl;
+               private final int _ru;
                
                protected RightMatrixMultTask( ArrayList<ColGroup> groups, 
MatrixBlock vect, MatrixBlock ret, int rl, int ru)  {
                        _groups = groups;
@@ -1446,25 +1581,18 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                }
                
                @Override
-               public Object call() throws DMLRuntimeException 
-               {
-                       // delegate vector-matrix operation to each column group
-                       for( ColGroup grp : _groups )
-                               if( grp instanceof ColGroupUncompressed ) 
//overwrites output
-                                       grp.rightMultByVector(_vect, _ret, _rl, 
_ru);
-                       for( ColGroup grp : _groups )
-                               if( !(grp instanceof ColGroupUncompressed) ) 
//adds to output
-                                       grp.rightMultByVector(_vect, _ret, _rl, 
_ru);
-                       return null;
+               public Long call() throws DMLRuntimeException {
+                       rightMultByVector(_groups, _vect, _ret, false, _rl, 
_ru);
+                       return _ret.recomputeNonZeros(_rl, _ru-1, 0, 0);
                }
        }
        
        private static class MatrixMultTransposeTask implements 
Callable<Object> 
        {
-               private ArrayList<ColGroup> _groups = null;
-               private MatrixBlock _ret = null;
-               private int _gl = -1;
-               private int _gu = -1;
+               private final ArrayList<ColGroup> _groups;
+               private final MatrixBlock _ret;
+               private final int _gl;
+               private final int _gu;
                
                protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, 
MatrixBlock ret, int gl, int gu)  {
                        _groups = groups;
@@ -1482,11 +1610,11 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        
        private static class UnaryAggregateTask implements 
Callable<MatrixBlock> 
        {
-               private ArrayList<ColGroup> _groups = null;
-               private int _rl = -1;
-               private int _ru = -1;
-               private MatrixBlock _ret = null;
-               private AggregateUnaryOperator _op = null;
+               private final ArrayList<ColGroup> _groups;
+               private final int _rl;
+               private final int _ru;
+               private final MatrixBlock _ret;
+               private final AggregateUnaryOperator _op;
                
                protected UnaryAggregateTask( ArrayList<ColGroup> groups, 
MatrixBlock ret, int rl, int ru, AggregateUnaryOperator op)  {
                        _groups = groups;
@@ -1507,18 +1635,15 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                
                @Override
                public MatrixBlock call() throws DMLRuntimeException {
-                       // delegate unary aggregate operation to each column 
group
-                       // (uncompressed column group handles separately)
-                       for( ColGroup grp : _groups )
-                               
((ColGroupBitmap)grp).unaryAggregateOperations(_op, _ret, _rl, _ru);
+                       aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru);
                        return _ret;
                }
        }
 
        private static class SizeEstimTask implements 
Callable<CompressedSizeInfo> 
        {
-               private CompressedSizeEstimator _estim = null;
-               private int _col = -1;
+               private final CompressedSizeEstimator _estim;
+               private final int _col;
                
                protected SizeEstimTask( CompressedSizeEstimator estim, int col 
)  {
                        _estim = estim;
@@ -1533,34 +1658,34 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
        private static class CompressTask implements Callable<ColGroup> 
        {
-               private MatrixBlock _in = null;
-               private CompressedSizeEstimator _estim = null;
-               private HashMap<Integer, Double> _compRatios = null;
-               private int _rlen = -1;
-               private double _sp = -1;
-               private int[] _colIndexes = null;
-               
-               protected CompressTask( MatrixBlock in, CompressedSizeEstimator 
estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] 
colIndexes )  {
+               private final MatrixBlock _in;
+               private final CompressedSizeEstimator _estim;
+               private final HashMap<Integer, Double> _compRatios;
+               private final int _rlen;
+               private final int[] _colIndexes;
+               private final boolean _denseEst;
+               
+               protected CompressTask( MatrixBlock in, CompressedSizeEstimator 
estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean 
denseEst )  {
                        _in = in;
                        _estim = estim;
                        _compRatios = compRatios;
                        _rlen = rlen;
-                       _sp = sp;
                        _colIndexes = colIndexes;
+                       _denseEst = denseEst;
                }
                
                @Override
                public ColGroup call() throws DMLRuntimeException {
-                       return compressColGroup(_in, _estim, _compRatios, 
_rlen, _sp, _colIndexes);
+                       return compressColGroup(_in, _estim, _compRatios, 
_rlen, _colIndexes, _denseEst);
                }
        }
        
        private static class DecompressTask implements Callable<Object> 
        {
-               private List<ColGroup> _colGroups = null;
-               private MatrixBlock _ret = null;
-               private int _rl = -1;
-               private int _ru = -1;
+               private final List<ColGroup> _colGroups;
+               private final MatrixBlock _ret;
+               private final int _rl;
+               private final int _ru;
                
                protected DecompressTask( List<ColGroup> colGroups, MatrixBlock 
ret, int rl, int ru )  {
                        _colGroups = colGroups;
@@ -1735,15 +1860,6 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                MatrixBlock tmp = isCompressed() ? decompress() : this;
                return tmp.zeroOutOperations(result, range, complementary);
        }
-       
-       @Override
-       public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op,
-                       MatrixValue result, int blockingFactorRow, int 
blockingFactorCol,
-                       MatrixIndexes indexesIn) throws DMLRuntimeException {
-               printDecompressWarning("aggregateUnaryOperations");
-               MatrixBlock tmp = isCompressed() ? decompress() : this;
-               return tmp.aggregateUnaryOperations(op, result, 
blockingFactorRow, blockingFactorCol, indexesIn);
-       }
 
        @Override
        public CM_COV_Object cmOperations(CMOperator op) throws 
DMLRuntimeException {
@@ -2000,4 +2116,11 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        LOG.warn("Operation '"+operation+"' not supported yet - 
decompressing for ULA operations.");
                }
        }
+       
+       private HashSet<Integer> seq(int from, int to, int incr) {
+               HashSet<Integer> ret = new HashSet<Integer>();
+               for (int i = from; i <= to; i+=incr)
+                       ret.add(i);
+               return ret;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
deleted file mode 100644
index 70308bb..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Used for the finding columns to co-code
- * 
- */
-public class PlanningBinPacker 
-{
-       private final float _binWeight;
-       private final List<Integer> _items;
-       private final List<Float> _itemWeights;
-
-       public PlanningBinPacker(float binWeight, List<Integer> items, 
List<Float> itemWeights) {
-               _binWeight = binWeight;
-               _items = items;
-               _itemWeights = itemWeights;
-       }
-
-       /**
-        * NOTE: upper bound is 17/10 OPT
-        * 
-        * @return key: available space, value: list of the bins that have that 
free space
-        */
-       public TreeMap<Float, List<List<Integer>>> packFirstFit() {
-               return packFirstFit(_items, _itemWeights);
-       }
-
-       private TreeMap<Float, List<List<Integer>>> packFirstFit(List<Integer> 
items, List<Float> itemWeights) 
-       {
-               // when searching for a bin, the first bin in the list is used
-               TreeMap<Float, List<List<Integer>>> bins = new TreeMap<Float, 
List<List<Integer>>>();
-               // first bin
-               bins.put(_binWeight, createBinList());
-               int numItems = items.size();
-               for (int i = 0; i < numItems; i++) {
-                       float itemWeight = itemWeights.get(i);
-                       Map.Entry<Float, List<List<Integer>>> entry = bins
-                                       .ceilingEntry(itemWeight);
-                       if (entry == null) {
-                               // new bin
-                               float newBinWeight = _binWeight - itemWeight;
-                               List<List<Integer>> binList = 
bins.get(newBinWeight);
-                               if (binList == null) {
-                                       bins.put(newBinWeight, 
createBinList(items.get(i)));
-                               } else {
-                                       List<Integer> newBin = new 
ArrayList<Integer>();
-                                       newBin.add(items.get(i));
-                                       binList.add(newBin);
-                               }
-                       } else {
-                               // add to the first bin in the list
-                               List<Integer> assignedBin = 
entry.getValue().remove(0);
-                               assignedBin.add(items.get(i));
-                               if (entry.getValue().size() == 0)
-                                       bins.remove(entry.getKey());
-                               float newBinWeight = entry.getKey() - 
itemWeight;
-                               List<List<Integer>> newBinsList = 
bins.get(newBinWeight);
-                               if (newBinsList == null) {
-                                       // new bin
-                                       bins.put(newBinWeight, 
createBinList(assignedBin));
-                               } else {
-                                       newBinsList.add(assignedBin);
-                               }
-                       }
-               }
-               return bins;
-       }
-
-       private List<List<Integer>> createBinList() {
-               List<List<Integer>> binList = new ArrayList<List<Integer>>();
-               binList.add(new ArrayList<Integer>());
-               return binList;
-       }
-
-       private List<List<Integer>> createBinList(int item) {
-               List<List<Integer>> binList = new ArrayList<List<Integer>>();
-               List<Integer> bin = new ArrayList<Integer>();
-               binList.add(bin);
-               bin.add(item);
-               return binList;
-       }
-
-       private List<List<Integer>> createBinList(List<Integer> bin) {
-               List<List<Integer>> binList = new ArrayList<List<Integer>>();
-               binList.add(bin);
-               return binList;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
deleted file mode 100644
index 9313cd9..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-
-public class PlanningCoCoder 
-{
-       //constants for weight computation
-       private final static float GROUPABILITY_THRESHOLD = 0.00064f;
-       private final static float PARTITION_WEIGHT = 0.05F; //higher values 
lead to more grouping
-       private final static float PARTITION_SIZE = PARTITION_WEIGHT * 
GROUPABILITY_THRESHOLD;
-
-       public static List<int[]> 
findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> 
availCols, 
-                       List<Integer> colsCardinalities, List<Long> 
compressedSize, int numRows, double sparsity, int k) 
-               throws DMLRuntimeException 
-       {
-               List<int[]> retGroups = new ArrayList<int[]>();
-               
-               // filtering out non-groupable columns as singleton groups
-               // weighted of each column is the ratio of its cardinality to 
the number
-               // of rows scaled by the matrix sparsity
-               int numCols = availCols.size();
-               List<Integer> groupCols = new ArrayList<Integer>();
-               List<Float> groupColWeights = new ArrayList<Float>();
-               HashMap<Integer, GroupableColInfo> groupColsInfo = new 
HashMap<Integer, GroupableColInfo>();
-               for (int i = 0; i < numCols; i++) {
-                       int colIx = availCols.get(i);
-                       int cardinality = colsCardinalities.get(i);
-                       float weight = ((float) cardinality) / numRows;
-                       if (weight <= GROUPABILITY_THRESHOLD) {
-                               groupCols.add(colIx);
-                               groupColWeights.add(weight);
-                               groupColsInfo.put(colIx, new 
GroupableColInfo(weight,compressedSize.get(i)));
-                       } else {
-                               retGroups.add(new int[] { colIx });
-                       }
-               }
-               
-               // bin packing based on PARTITION_WEIGHT and column weights
-               float weight = computeWeightForCoCoding(numRows, sparsity);
-               TreeMap<Float, List<List<Integer>>> bins = new 
PlanningBinPacker(
-                               weight, groupCols, 
groupColWeights).packFirstFit();
-
-               // brute force grouping within each partition
-               retGroups.addAll( (k > 1) ?
-                               getCocodingGroupsBruteForce(bins, 
groupColsInfo, sizeEstimator, numRows, k) :
-                               getCocodingGroupsBruteForce(bins, 
groupColsInfo, sizeEstimator, numRows));
-                       
-               return retGroups;
-       }
-
-       private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, 
List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, 
CompressedSizeEstimator estim, int rlen) 
-       {
-               List<int[]> retGroups = new ArrayList<int[]>();         
-               for (List<List<Integer>> binList : bins.values()) {
-                       for (List<Integer> bin : binList) {
-                               // building an array of singleton CoCodingGroup
-                               ArrayList<PlanningCoCodingGroup> sgroups = new 
ArrayList<PlanningCoCodingGroup>();
-                               for (Integer col : bin)
-                                       sgroups.add(new 
PlanningCoCodingGroup(col, groupColsInfo.get(col)));
-                               // brute force co-coding        
-                               PlanningCoCodingGroup[] outputGroups = 
findCocodesBruteForce(
-                                               estim, rlen, 
sgroups.toArray(new PlanningCoCodingGroup[0]));
-                               for (PlanningCoCodingGroup grp : outputGroups)
-                                       retGroups.add(grp.getColIndices());
-                       }
-               }
-               
-               return retGroups;
-       }
-
-       private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, 
List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, 
CompressedSizeEstimator estim, int rlen, int k) 
-               throws DMLRuntimeException 
-       {
-               List<int[]> retGroups = new ArrayList<int[]>();         
-               try {
-                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
-                       ArrayList<CocodeTask> tasks = new 
ArrayList<CocodeTask>();
-                       for (List<List<Integer>> binList : bins.values())
-                               for (List<Integer> bin : binList) {
-                                       // building an array of singleton 
CoCodingGroup
-                                       ArrayList<PlanningCoCodingGroup> 
sgroups = new ArrayList<PlanningCoCodingGroup>();
-                                       for (Integer col : bin)
-                                               sgroups.add(new 
PlanningCoCodingGroup(col, groupColsInfo.get(col)));
-                                       tasks.add(new CocodeTask(estim, 
sgroups, rlen));
-                               }
-                       List<Future<PlanningCoCodingGroup[]>> rtask = 
pool.invokeAll(tasks);    
-                       for( Future<PlanningCoCodingGroup[]> lrtask : rtask )
-                               for (PlanningCoCodingGroup grp : lrtask.get())
-                                       retGroups.add(grp.getColIndices());
-                       pool.shutdown();
-               }
-               catch(Exception ex) {
-                       throw new DMLRuntimeException(ex);
-               }
-               
-               return retGroups;
-       }
-
-       /**
-        * Identify columns to code together. Uses a greedy approach that merges
-        * pairs of column groups into larger groups. Each phase of the greedy
-        * algorithm considers all combinations of pairs to merge.
-        * 
-        * @param sizeEstimator compressed size estimator
-        * @param numRowsWeight number of rows weight
-        * @param singltonGroups planning co-coding groups
-        * @return
-        */
-       private static PlanningCoCodingGroup[] findCocodesBruteForce(
-                       CompressedSizeEstimator sizeEstimator, float 
numRowsWeight,
-                       PlanningCoCodingGroup[] singltonGroups) 
-       {
-               // Populate a priority queue with all available 2-column 
cocodings.
-               PriorityQueue<PlanningGroupMergeAction> q = new 
PriorityQueue<PlanningGroupMergeAction>();
-               for (int leftIx = 0; leftIx < singltonGroups.length; leftIx++) {
-                       PlanningCoCodingGroup leftGrp = singltonGroups[leftIx];
-                       for (int rightIx = leftIx + 1; rightIx < 
singltonGroups.length; rightIx++) {
-                               PlanningCoCodingGroup rightGrp = 
singltonGroups[rightIx];
-                               // at least one of the two groups should be 
low-cardinality
-                               float cardRatio = leftGrp.getCardinalityRatio() 
+ rightGrp.getCardinalityRatio(); 
-                               if ( cardRatio < GROUPABILITY_THRESHOLD) {
-                                       PlanningGroupMergeAction potentialMerge 
= new PlanningGroupMergeAction(
-                                                       sizeEstimator, 
numRowsWeight, leftGrp, rightGrp);
-                                       if (potentialMerge.getChangeInSize() < 
0) {
-                                               q.add(potentialMerge);
-                                       }
-                               }
-                       }
-               }
-               PlanningCoCodingGroup[] colGroups = singltonGroups;
-               
-               // Greedily merge groups until we can no longer reduce the 
number of
-               // runs by merging groups
-               while (q.size() > 0) {
-                       PlanningGroupMergeAction merge = q.poll();
-
-                       // The queue can contain merge actions involving column 
groups that
-                       // have already been merged.
-                       // Filter those actions out.
-                       int leftIx = findInArray(colGroups, merge.getLeftGrp());
-                       int rightIx = findInArray(colGroups, 
merge.getRightGrp());
-                       if (leftIx < 0 || rightIx < 0) {
-                               // One or more of the groups to be merged has 
already been made
-                               // part of another group.
-                               // Drop the merge action.
-                       } else {
-                               PlanningCoCodingGroup mergedGrp = 
merge.getMergedGrp();
-
-                               PlanningCoCodingGroup[] newColGroups = new 
PlanningCoCodingGroup[colGroups.length - 1];
-                               int targetIx = 0;
-                               for (int i = 0; i < colGroups.length; i++) {
-                                       if (i != leftIx && i != rightIx) {
-                                               newColGroups[targetIx] = 
colGroups[i];
-                                               targetIx++;
-                                       }
-                               }
-
-                               // New group goes at the end to (hopefully) 
speed up future
-                               // linear search operations
-                               newColGroups[newColGroups.length - 1] = 
mergedGrp;
-
-                               // Consider merging the new group with all the 
other
-                               // pre-existing groups.
-                               for (int i = 0; i < newColGroups.length - 1; 
i++) {
-                                       PlanningCoCodingGroup newLeftGrp = 
newColGroups[i];
-                                       PlanningCoCodingGroup newRightGrp = 
mergedGrp;
-                                       if (newLeftGrp.getCardinalityRatio()
-                                                       + 
newRightGrp.getCardinalityRatio() < GROUPABILITY_THRESHOLD) {
-                                               PlanningGroupMergeAction 
newPotentialMerge = new PlanningGroupMergeAction(
-                                                               sizeEstimator, 
numRowsWeight, newLeftGrp,
-                                                               newRightGrp);
-                                               if 
(newPotentialMerge.getChangeInSize() < 0) {
-                                                       
q.add(newPotentialMerge);
-                                               }
-                                       }
-                               }
-                               colGroups = newColGroups;
-                       }
-               }
-               return colGroups;
-       }
-
-       private static float computeWeightForCoCoding(int numRows, double 
sparsity) {
-               //we use a constant partition size (independent of the number 
of rows
-               //in order to ensure constant compression speed independent of 
blocking)
-               return PARTITION_SIZE;
-       }
-
-       private static int findInArray(Object[] arr, Object val) {
-               for (int i = 0; i < arr.length; i++) {
-                       if (arr[i].equals(val)) {
-                               return i;
-                       }
-               }
-               return -1;
-       }
-
-       protected static class GroupableColInfo {
-               float cardRatio;
-               long size;
-
-               public GroupableColInfo(float lcardRatio, long lsize) {
-                       cardRatio = lcardRatio;
-                       size = lsize;
-               }
-       }
-
-       private static class CocodeTask implements 
Callable<PlanningCoCodingGroup[]> 
-       {
-               private CompressedSizeEstimator _estim = null;
-               private ArrayList<PlanningCoCodingGroup> _sgroups = null;
-               private int _rlen = -1;
-               
-               protected CocodeTask( CompressedSizeEstimator estim, 
ArrayList<PlanningCoCodingGroup> sgroups, int rlen )  {
-                       _estim = estim;
-                       _sgroups = sgroups;
-                       _rlen = rlen;
-               }
-               
-               @Override
-               public PlanningCoCodingGroup[] call() throws 
DMLRuntimeException {
-                       // brute force co-coding        
-                       return findCocodesBruteForce(_estim, _rlen, 
-                                       _sgroups.toArray(new 
PlanningCoCodingGroup[0]));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
deleted file mode 100644
index 9ee0d7e..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.Arrays;
-
-import org.apache.sysml.runtime.compress.PlanningCoCoder.GroupableColInfo;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
-
-/** 
- * Class to represent information about co-coding a group of columns. 
- * 
- */
-public class PlanningCoCodingGroup 
-{
-       private int[] _colIndexes;
-       private long _estSize;
-       private float _cardRatio;
-
-       /**
-        * Constructor for a one-column group; i.e. do not co-code a given 
column.
-        * 
-        * @param col column
-        * @param info groupable column info
-        */
-       public PlanningCoCodingGroup(int col, GroupableColInfo info) {
-               _colIndexes = new int[]{col};
-               _estSize = info.size;
-               _cardRatio = info.cardRatio;
-       }
-
-       /**
-        * Constructor for merging two disjoint groups of columns
-        * 
-        * @param grp1   first group of columns to merge
-        * @param grp2   second group to merge
-        * @param bitmapSizeEstimator bitmap size estimator
-        * @param numRowsWeight numRows x sparsity
-        */
-       public PlanningCoCodingGroup(PlanningCoCodingGroup grp1, 
PlanningCoCodingGroup grp2,
-                       CompressedSizeEstimator bitmapSizeEstimator, float 
numRowsWeight) 
-       {
-               // merge sorted non-empty arrays
-               _colIndexes = new int[grp1._colIndexes.length + 
grp2._colIndexes.length];               
-               int grp1Ptr = 0, grp2Ptr = 0;
-               for (int mergedIx = 0; mergedIx < _colIndexes.length; 
mergedIx++) {
-                       if (grp1._colIndexes[grp1Ptr] < 
grp2._colIndexes[grp2Ptr]) {
-                               _colIndexes[mergedIx] = 
grp1._colIndexes[grp1Ptr++];
-                               if (grp1Ptr == grp1._colIndexes.length) {
-                                       System.arraycopy(grp2._colIndexes, 
grp2Ptr, _colIndexes,
-                                                       mergedIx + 1, 
grp2._colIndexes.length - grp2Ptr);
-                                       break;
-                               }
-                       } else {
-                               _colIndexes[mergedIx] = 
grp2._colIndexes[grp2Ptr++];
-                               if (grp2Ptr == grp2._colIndexes.length) {
-                                       System.arraycopy(grp1._colIndexes, 
grp1Ptr, _colIndexes,
-                                                       mergedIx + 1, 
grp1._colIndexes.length - grp1Ptr);
-                                       break;
-                               }
-                       }
-               }
-               
-               // estimating size info
-               CompressedSizeInfo groupSizeInfo = bitmapSizeEstimator
-                               .estimateCompressedColGroupSize(_colIndexes);
-               _estSize = groupSizeInfo.getMinSize();
-               _cardRatio = groupSizeInfo.getEstCarinality() / numRowsWeight;
-       }
-
-       public int[] getColIndices() {
-               return _colIndexes;
-       }
-
-       /**
-        * Obtain estimated compressed size of the grouped columns.
-        * 
-        * @return estimated compressed size of the grouped columns
-        */
-       public long getEstSize() {
-               return _estSize;
-       }
-
-       public float getCardinalityRatio() {
-               return _cardRatio;
-       }
-
-       @Override
-       public String toString() {
-               return Arrays.toString(_colIndexes);
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
deleted file mode 100644
index 47d46d5..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-
-/**
- * Internal data structure for tracking potential merges of column groups in
- * co-coding calculations.
- * 
- */
-class PlanningGroupMergeAction implements Comparable<PlanningGroupMergeAction> 
-{
-       private PlanningCoCodingGroup _leftGrp;   //left input
-       private PlanningCoCodingGroup _rightGrp;  //right input
-       private PlanningCoCodingGroup _mergedGrp; //output
-       private long _changeInSize;
-
-       
-       public PlanningGroupMergeAction(CompressedSizeEstimator sizeEstimator,
-                       float numRowsWeight, PlanningCoCodingGroup leftGrp, 
PlanningCoCodingGroup rightGrp) {
-               _leftGrp = leftGrp;
-               _rightGrp = rightGrp;
-               _mergedGrp = new PlanningCoCodingGroup(leftGrp, rightGrp, 
sizeEstimator, numRowsWeight);
-
-               // Negative size change ==> Decrease in size
-               _changeInSize = _mergedGrp.getEstSize() 
-                               - leftGrp.getEstSize() - rightGrp.getEstSize();
-       }
-
-       public int compareTo(PlanningGroupMergeAction o) {
-               // We only sort by the change in size
-               return (int) Math.signum(_changeInSize - o._changeInSize);
-       }
-
-       @Override
-       public String toString() {
-               return String.format("Merge %s and %s", _leftGrp, _rightGrp);
-       }
-
-       public PlanningCoCodingGroup getLeftGrp() {
-               return _leftGrp;
-       }
-
-       public PlanningCoCodingGroup getRightGrp() {
-               return _rightGrp;
-       }
-
-       public PlanningCoCodingGroup getMergedGrp() {
-               return _mergedGrp;
-       }
-
-       public long getChangeInSize() {
-               return _changeInSize;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
index 63c0467..60d0532 100644
--- 
a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
@@ -63,7 +63,6 @@ public class ReaderColumnSelectionSparse extends 
ReaderColumnSelection
                if( data.getSparseBlock()!=null )
                for( int i=0; i<colIndexes.length; i++ )
                        sparseCols[i] = 
data.getSparseBlock().get(colIndexes[i]);
-               Arrays.fill(sparsePos, 0);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java 
b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
index d62bae9..2f68edf 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
@@ -21,10 +21,13 @@ package org.apache.sysml.runtime.compress;
 
 import java.util.Arrays;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap;
 import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap;
 import 
org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap.DArrayIListEntry;
 import 
org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry;
+import org.apache.sysml.runtime.compress.utils.IntArrayList;
+import org.apache.sysml.runtime.util.SortUtils;
 
 /** 
  * Uncompressed representation of one or more columns in bitmap format. 
@@ -32,13 +35,13 @@ import 
org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry;
  */
 public final class UncompressedBitmap 
 {
-       private int _numCols;
+       private final int _numCols;
 
        /** Distinct values that appear in the column. Linearized as value 
groups <v11 v12> <v21 v22>.*/
        private double[] _values;
 
        /** Bitmaps (as lists of offsets) for each of the values. */
-       private int[][] _offsetsLists;
+       private IntArrayList[] _offsetsLists;
 
        public UncompressedBitmap( DblArrayIntListHashMap distinctVals, int 
numColumns ) 
        {
@@ -46,11 +49,11 @@ public final class UncompressedBitmap
                // Convert inputs to arrays
                int numVals = distinctVals.size();
                _values = new double[numVals*numColumns];
-               _offsetsLists = new int[numVals][];
+               _offsetsLists = new IntArrayList[numVals];
                int bitmapIx = 0;
                for( DArrayIListEntry val : distinctVals.extractValues()) {
                        System.arraycopy(val.key.getData(), 0, _values, 
bitmapIx*numColumns, numColumns);
-                       _offsetsLists[bitmapIx++] = val.value.extractValues();
+                       _offsetsLists[bitmapIx++] = val.value;
                }
                _numCols = numColumns;
        }
@@ -61,11 +64,11 @@ public final class UncompressedBitmap
                // Convert inputs to arrays
                int numVals = distinctVals.size();
                _values = new double[numVals];
-               _offsetsLists = new int[numVals][];
+               _offsetsLists = new IntArrayList[numVals];
                int bitmapIx = 0;
                for(DIListEntry val : distinctVals.extractValues()) {
                        _values[bitmapIx] = val.key;
-                       _offsetsLists[bitmapIx++] = val.value.extractValues();
+                       _offsetsLists[bitmapIx++] = val.value;
                }
                _numCols = 1;
        }
@@ -74,6 +77,15 @@ public final class UncompressedBitmap
                return _numCols;
        }
        
+       /** 
+        * Get all values without unnecessary allocations and copies.
+        * 
+        * @return dictionary of value tuples
+        */
+       public double[] getValues() {
+               return _values;
+       }
+       
        /**
         * Obtain tuple of column values associated with index.
         * 
@@ -94,21 +106,46 @@ public final class UncompressedBitmap
                return _values.length / _numCols;
        }
 
-       /**
-        * Obtain array of offsets of the rows containing index value
-        * 
-        * @param ix   index of a particular distinct value
-        * @return IMMUTABLE array of the offsets of the rows containing the 
value
-        *         with the indicated index
-        */
-       public int[] getOffsetsList(int ix) {
+       public IntArrayList getOffsetsList(int ix) {
                return _offsetsLists[ix];
        }
 
-       public int getNumOffsets() {
-               int ret = 0;
-               for( int[] offlist : _offsetsLists )
-                       ret += offlist.length;
+       public long getNumOffsets() {
+               long ret = 0;
+               for( IntArrayList offlist : _offsetsLists )
+                       ret += offlist.size();
                return ret;
        }
+       
+       public int getNumOffsets(int ix) {
+               return _offsetsLists[ix].size();
+       }
+       
+       public void sortValuesByFrequency() {
+               int numVals = getNumValues();
+               int numCols = getNumColumns();
+               
+               double[] freq = new double[numVals];
+               int[] pos = new int[numVals];
+               
+               //populate the temporary arrays
+               for(int i=0; i<numVals; i++) {
+                       freq[i] = getNumOffsets(i);
+                       pos[i] = i;
+               }
+               
+               //sort ascending and reverse (descending)
+               SortUtils.sortByValue(0, numVals, freq, pos);
+               ArrayUtils.reverse(pos);
+               
+               //create new value and offset list arrays
+               double[] lvalues = new double[numVals*numCols];
+               IntArrayList[] loffsets = new IntArrayList[numVals];
+               for(int i=0; i<numVals; i++) {
+                       System.arraycopy(_values, pos[i]*numCols, lvalues, 
i*numCols, numCols);
+                       loffsets[i] = _offsetsLists[pos[i]];
+               }
+               _values = lvalues;
+               _offsetsLists = loffsets;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
new file mode 100644
index 0000000..05af19d
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
@@ -0,0 +1,19 @@
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.HashMap;
+import java.util.List;
+
+import 
org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+
+public abstract class ColumnGroupPartitioner 
+{
+       /**
+        * Partitions a list of columns into a list of partitions that contains 
subsets of columns.
+        * Note that this call must compute a complete and disjoint 
partitioning.
+        * 
+        * @param groupCols list of columns 
+        * @param groupColsInfo list of column infos
+        * @return list of partitions (where each partition is a list of 
columns)
+        */
+       public abstract List<List<Integer>> partitionColumns(List<Integer> 
groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo);
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
new file mode 100644
index 0000000..0fb6abe
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import 
org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+import org.apache.sysml.runtime.util.SortUtils;
+
+/**
+ * Column group partitioning with bin packing heuristic.
+ * 
+ */
+public class ColumnGroupPartitionerBinPacking extends ColumnGroupPartitioner
+{
+       private static final boolean FIRST_FIT_DEC = true;
+       private static final int MAX_COL_PER_GROUP = Integer.MAX_VALUE;
+
+       //we use a constant partition size (independent of the number of rows
+       //in order to ensure constant compression speed independent of blocking)
+       public static double BIN_CAPACITY = 0.000032; //higher values, more 
grouping
+       
+       @Override
+       public List<List<Integer>> partitionColumns(List<Integer> groupCols, 
HashMap<Integer, GroupableColInfo> groupColsInfo) 
+       {
+               //obtain column weights
+               int[] items = new int[groupCols.size()];
+               double[] itemWeights = new double[groupCols.size()];
+               for( int i=0; i<groupCols.size(); i++ ) {
+                       int col = groupCols.get(i);
+                       items[i] = col;
+                       itemWeights[i] = groupColsInfo.get(col).cardRatio;
+               } 
+               
+               //sort items (first fit decreasing)
+               if( FIRST_FIT_DEC ) {
+                       SortUtils.sortByValue(0, items.length, itemWeights, 
items);
+                       ArrayUtils.reverse(items);
+                       ArrayUtils.reverse(itemWeights);
+               }
+               
+               //partition columns via bin packing
+               return packFirstFit(items, itemWeights);
+       }
+
+       /**
+        * NOTE: upper bound is 17/10 OPT
+        * 
+        * @param items the items in terms of columns
+        * @param itemWeights the weights of the items
+        * @return
+        */
+       private List<List<Integer>> packFirstFit(int[] items, double[] 
itemWeights) 
+       {
+               List<List<Integer>> bins = new ArrayList<List<Integer>>();
+               List<Double> binWeights = new ArrayList<Double>(); 
+               
+               for( int i = 0; i < items.length; i++ ) {
+                       //add to existing bin
+                       boolean assigned = false;
+                       for( int j = 0; j < bins.size(); j++ ) {
+                               double newBinWeight = 
binWeights.get(j)-itemWeights[i];
+                               if( newBinWeight >= 0 && bins.get(j).size() < 
MAX_COL_PER_GROUP-1 ){
+                                       bins.get(j).add(items[i]);
+                                       binWeights.set(j, newBinWeight);
+                                       assigned = true; break;
+                               }
+                       }
+                               
+                       //create new bin at end of list
+                       if( !assigned ) {
+                               bins.add(new 
ArrayList<Integer>(Arrays.asList(items[i])));
+                               binWeights.add(BIN_CAPACITY-itemWeights[i]);
+                       }               
+               }
+               
+               return bins;
+       }
+}


Reply via email to