http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerStatic.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerStatic.java
 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerStatic.java
new file mode 100644
index 0000000..e2f00f3
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerStatic.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import 
org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+
+/**
+ * Column group partitioning with static distribution heuristic.
+ * 
+ */
+public class ColumnGroupPartitionerStatic extends ColumnGroupPartitioner
+{
+       private static final int MAX_COL_PER_GROUP = 20;
+
+       @Override
+       public List<List<Integer>> partitionColumns(List<Integer> groupCols, 
HashMap<Integer, GroupableColInfo> groupColsInfo) 
+       {
+               List<List<Integer>> ret = new ArrayList<List<Integer>>();
+               int numParts = 
(int)Math.ceil((double)groupCols.size()/MAX_COL_PER_GROUP);
+               int partSize = 
(int)Math.ceil((double)groupCols.size()/numParts);
+               
+               for( int i=0, pos=0; i<numParts; i++, pos+=partSize ) {
+                       List<Integer> tmp = new ArrayList<Integer>();
+                       for( int j=0; j<partSize && pos+j<groupCols.size(); j++ 
)
+                               tmp.add(groupCols.get(pos+j));
+                       ret.add(tmp);
+               }
+               
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
new file mode 100644
index 0000000..778f221
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+
+public class PlanningCoCoder 
+{
+       //internal configurations 
+       private final static PartitionerType COLUMN_PARTITIONER = 
PartitionerType.BIN_PACKING;
+       
+       private static final Log LOG = 
LogFactory.getLog(PlanningCoCoder.class.getName());
+       
+       public enum PartitionerType {
+               BIN_PACKING,
+               STATIC,
+       }
+       
+       public static List<int[]> 
findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> 
cols, 
+                       CompressedSizeInfo[] colInfos, int numRows, int k) 
+               throws DMLRuntimeException 
+       {
+               // filtering out non-groupable columns as singleton groups
+               // weight is the ratio of its cardinality to the number of rows 
+               int numCols = cols.size();
+               List<Integer> groupCols = new ArrayList<Integer>();
+               HashMap<Integer, GroupableColInfo> groupColsInfo = new 
HashMap<Integer, GroupableColInfo>();
+               for (int i = 0; i < numCols; i++) {
+                       int colIx = cols.get(i);
+                       double cardinality = colInfos[colIx].getEstCard();
+                       double weight = cardinality / numRows;
+                       groupCols.add(colIx);
+                       groupColsInfo.put(colIx, new 
GroupableColInfo(weight,colInfos[colIx].getMinSize()));
+               }
+               
+               // use column group partitioner to create partitions of columns
+               List<List<Integer>> bins = 
createColumnGroupPartitioner(COLUMN_PARTITIONER)
+                               .partitionColumns(groupCols, groupColsInfo);
+
+               // brute force grouping within each partition
+               return (k > 1) ?
+                               getCocodingGroupsBruteForce(bins, 
groupColsInfo, sizeEstimator, numRows, k) :
+                               getCocodingGroupsBruteForce(bins, 
groupColsInfo, sizeEstimator, numRows);
+       }
+
+       private static List<int[]> 
getCocodingGroupsBruteForce(List<List<Integer>> bins, HashMap<Integer, 
GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen) 
+       {
+               List<int[]> retGroups = new ArrayList<int[]>();         
+               for (List<Integer> bin : bins) {
+                       // building an array of singleton CoCodingGroup
+                       ArrayList<PlanningCoCodingGroup> sgroups = new 
ArrayList<PlanningCoCodingGroup>();
+                       for (Integer col : bin)
+                               sgroups.add(new PlanningCoCodingGroup(col, 
groupColsInfo.get(col)));
+                       // brute force co-coding        
+                       PlanningCoCodingGroup[] outputGroups = 
findCocodesBruteForce(
+                                       estim, rlen, sgroups.toArray(new 
PlanningCoCodingGroup[0]));
+                       for (PlanningCoCodingGroup grp : outputGroups)
+                               retGroups.add(grp.getColIndices());
+               }
+               
+               return retGroups;
+       }
+
+       private static List<int[]> 
getCocodingGroupsBruteForce(List<List<Integer>> bins, HashMap<Integer, 
GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen, int 
k) 
+               throws DMLRuntimeException 
+       {
+               List<int[]> retGroups = new ArrayList<int[]>();         
+               try {
+                       ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ArrayList<CocodeTask> tasks = new 
ArrayList<CocodeTask>();
+                       for (List<Integer> bin : bins) {
+                               // building an array of singleton CoCodingGroup
+                               ArrayList<PlanningCoCodingGroup> sgroups = new 
ArrayList<PlanningCoCodingGroup>();
+                               for (Integer col : bin)
+                                       sgroups.add(new 
PlanningCoCodingGroup(col, groupColsInfo.get(col)));
+                               tasks.add(new CocodeTask(estim, sgroups, rlen));
+                       }
+                       List<Future<PlanningCoCodingGroup[]>> rtask = 
pool.invokeAll(tasks);    
+                       for( Future<PlanningCoCodingGroup[]> lrtask : rtask )
+                               for (PlanningCoCodingGroup grp : lrtask.get())
+                                       retGroups.add(grp.getColIndices());
+                       pool.shutdown();
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+               
+               return retGroups;
+       }
+
+       /**
+        * Identify columns to code together. Uses a greedy approach that merges
+        * pairs of column groups into larger groups. Each phase of the greedy
+        * algorithm considers all combinations of pairs to merge.
+        * 
+        * @param sizeEstimator compressed size estimator
+        * @param numRowsWeight number of rows weight
+        * @param singltonGroups planning co-coding groups
+        * @return
+        */
+       private static PlanningCoCodingGroup[] findCocodesBruteForce(
+                       CompressedSizeEstimator estim, int numRows,
+                       PlanningCoCodingGroup[] singletonGroups) 
+       {
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("Cocoding: process "+singletonGroups.length);
+               
+               List<PlanningCoCodingGroup> workset = 
+                               new 
ArrayList<PlanningCoCodingGroup>(Arrays.asList(singletonGroups));
+               
+               //establish memo table for extracted column groups
+               PlanningMemoTable memo = new PlanningMemoTable();
+               
+               //process merging iterations until no more change
+               boolean changed = true;
+               while( changed && workset.size()>1 ) {
+                       //find best merge, incl memoization
+                       PlanningCoCodingGroup tmp = null;
+                       for( int i=0; i<workset.size(); i++ ) {
+                               for( int j=i+1; j<workset.size(); j++ ) {
+                                       PlanningCoCodingGroup c1 = 
workset.get(i);
+                                       PlanningCoCodingGroup c2 = 
workset.get(j);
+                                       memo.incrStats(1, 0, 0);
+                                       
+                                       //pruning filter: skip dominated 
candidates
+                                       if( -Math.min(c1.getEstSize(), 
c2.getEstSize()) > memo.getOptChangeInSize() )
+                                               continue;
+                                       
+                                       //memoization or newly created group 
(incl bitmap extraction)
+                                       PlanningCoCodingGroup c1c2 = 
memo.getOrCreate(c1, c2, estim, numRows);
+               
+                                       //keep best merged group only
+                                       if( tmp == null || 
c1c2.getChangeInSize() < tmp.getChangeInSize()
+                                               || (c1c2.getChangeInSize() == 
tmp.getChangeInSize() 
+                                                       && 
c1c2.getColIndices().length < tmp.getColIndices().length))
+                                               tmp = c1c2;
+                               }
+                       }
+                       
+                       //modify working set
+                       if( tmp != null && tmp.getChangeInSize() < 0 ) {
+                               workset.remove(tmp.getLeftGroup());
+                               workset.remove(tmp.getRightGroup());
+                               workset.add(tmp);
+                               memo.remove(tmp);
+                               
+                               if( LOG.isTraceEnabled() ) {
+                                       LOG.trace("--merge groups: 
"+Arrays.toString(tmp.getLeftGroup().getColIndices())+" and "
+                                                       
+Arrays.toString(tmp.getRightGroup().getColIndices()));
+                               }
+                       }
+                       else {
+                               changed = false;
+                       }
+               }
+               
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("--stats: "+Arrays.toString(memo.getStats()));
+               
+               return workset.toArray(new PlanningCoCodingGroup[0]);
+       }
+
+       private static ColumnGroupPartitioner 
createColumnGroupPartitioner(PartitionerType type) {
+               switch( type ) {
+                       case BIN_PACKING: 
+                               return new ColumnGroupPartitionerBinPacking();
+                               
+                       case STATIC:
+                               return new ColumnGroupPartitionerStatic();
+                               
+                       default:
+                               throw new RuntimeException(
+                                       "Unsupported column group partitioner: 
"+type.toString());
+               }
+       }
+       
+       public static class GroupableColInfo {
+               public final double cardRatio;
+               public final long size;
+
+               public GroupableColInfo(double lcardRatio, long lsize) {
+                       cardRatio = lcardRatio;
+                       size = lsize;
+               }
+       }
+
+       private static class CocodeTask implements 
Callable<PlanningCoCodingGroup[]> 
+       {
+               private CompressedSizeEstimator _estim = null;
+               private ArrayList<PlanningCoCodingGroup> _sgroups = null;
+               private int _rlen = -1;
+               
+               protected CocodeTask( CompressedSizeEstimator estim, 
ArrayList<PlanningCoCodingGroup> sgroups, int rlen )  {
+                       _estim = estim;
+                       _sgroups = sgroups;
+                       _rlen = rlen;
+               }
+               
+               @Override
+               public PlanningCoCodingGroup[] call() throws 
DMLRuntimeException {
+                       // brute force co-coding        
+                       return findCocodesBruteForce(_estim, _rlen, 
+                                       _sgroups.toArray(new 
PlanningCoCodingGroup[0]));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCodingGroup.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCodingGroup.java
 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCodingGroup.java
new file mode 100644
index 0000000..caaa271
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCodingGroup.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.Arrays;
+
+import 
org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+
+/** 
+ * Class to represent information about co-coding a group of columns. 
+ * 
+ */
+public class PlanningCoCodingGroup 
+{
+       private int[] _colIndexes;
+       private PlanningCoCodingGroup _leftGrp;
+       private PlanningCoCodingGroup _rightGrp;
+       
+       private long _estSize;
+       private double _cardRatio;
+       
+       
+       /**
+        * Constructor for a one-column group; i.e. do not co-code a given 
column.
+        * 
+        * @param col column
+        * @param info groupable column info
+        */
+       public PlanningCoCodingGroup(int col, GroupableColInfo info) {
+               _colIndexes = new int[]{col};
+               _estSize = info.size;
+               _cardRatio = info.cardRatio;
+       }
+
+       /**
+        * Constructor for merging two disjoint groups of columns
+        * 
+        * @param grp1   first group of columns to merge
+        * @param grp2   second group to merge
+        * @param bitmapSizeEstimator bitmap size estimator
+        * @param numRowsWeight numRows x sparsity
+        */
+       public PlanningCoCodingGroup(PlanningCoCodingGroup grp1, 
PlanningCoCodingGroup grp2,
+                       CompressedSizeEstimator estim, int numRows) 
+       {
+               _colIndexes = getMergedIndexes(grp1._colIndexes, 
grp2._colIndexes);
+               
+               // estimating size info
+               CompressedSizeInfo groupSizeInfo = estim
+                               .estimateCompressedColGroupSize(_colIndexes);
+               _estSize = groupSizeInfo.getMinSize();
+               _cardRatio = groupSizeInfo.getEstCard() / numRows;
+               
+               _leftGrp = grp1;
+               _rightGrp = grp2;
+       }
+
+       public int[] getColIndices() {
+               return _colIndexes;
+       }
+
+       /**
+        * Obtain estimated compressed size of the grouped columns.
+        * 
+        * @return estimated compressed size of the grouped columns
+        */
+       public long getEstSize() {
+               return _estSize;
+       }
+       
+       public double getChangeInSize() {
+               if( _leftGrp == null || _rightGrp == null )
+                       return 0;
+               
+               return getEstSize() 
+                       - _leftGrp.getEstSize() 
+                       - _rightGrp.getEstSize();
+       }
+
+       public double getCardinalityRatio() {
+               return _cardRatio;
+       }
+       
+       public PlanningCoCodingGroup getLeftGroup() {
+               return _leftGrp;
+       }
+       
+       public PlanningCoCodingGroup getRightGroup() {
+               return _rightGrp;
+       }
+       
+       @Override 
+       public int hashCode() {
+               return Arrays.hashCode(_colIndexes);
+       }
+       
+       @Override 
+       public boolean equals(Object that) {
+               if( !(that instanceof PlanningCoCodingGroup) )
+                       return false;
+               
+               PlanningCoCodingGroup thatgrp = (PlanningCoCodingGroup) that;
+               return Arrays.equals(_colIndexes, thatgrp._colIndexes);
+       }
+
+       @Override
+       public String toString() {
+               return Arrays.toString(_colIndexes);
+       }
+       
+       public static int[] getMergedIndexes(int[] indexes1, int[] indexes2) {
+               // merge sorted non-empty arrays
+               int[] ret = new int[indexes1.length + indexes2.length];         
+               int grp1Ptr = 0, grp2Ptr = 0;
+               for (int mergedIx = 0; mergedIx < ret.length; mergedIx++) {
+                       if (indexes1[grp1Ptr] < indexes2[grp2Ptr]) {
+                               ret[mergedIx] = indexes1[grp1Ptr++];
+                               if (grp1Ptr == indexes1.length) {
+                                       System.arraycopy(indexes2, grp2Ptr, 
ret, mergedIx + 1, indexes2.length - grp2Ptr);
+                                       break;
+                               }
+                       } 
+                       else {
+                               ret[mergedIx] = indexes2[grp2Ptr++];
+                               if (grp2Ptr == indexes2.length) {
+                                       System.arraycopy(indexes1, grp1Ptr, 
ret, mergedIx + 1, indexes1.length - grp1Ptr);
+                                       break;
+                               }
+                       }
+               }
+               
+               return ret;
+       }
+       
+       public static class ColIndexes {
+               final int[] _colIndexes;
+               
+               public ColIndexes(int[] colIndexes) {
+                       _colIndexes = colIndexes;
+               }
+       
+               @Override 
+               public int hashCode() {
+                       return Arrays.hashCode(_colIndexes);
+               }
+               
+               @Override 
+               public boolean equals(Object that) {
+                       if( !(that instanceof ColIndexes) )
+                               return false;
+                       
+                       ColIndexes thatgrp = (ColIndexes) that;
+                       return Arrays.equals(_colIndexes, thatgrp._colIndexes);
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningMemoTable.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningMemoTable.java 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningMemoTable.java
new file mode 100644
index 0000000..3f683c2
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningMemoTable.java
@@ -0,0 +1,75 @@
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import 
org.apache.sysml.runtime.compress.cocode.PlanningCoCodingGroup.ColIndexes;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+
+public class PlanningMemoTable 
+{
+       private HashMap<ColIndexes,PlanningCoCodingGroup> _memo = new 
HashMap<ColIndexes,PlanningCoCodingGroup>();
+       private double _optChangeInSize = 0; 
+       private int[] _stats = new int[3];
+       
+       public PlanningCoCodingGroup getOrCreate(PlanningCoCodingGroup c1, 
PlanningCoCodingGroup c2, CompressedSizeEstimator estim, int numRows) 
+       {
+               ColIndexes c1c2Indexes = new ColIndexes(PlanningCoCodingGroup
+                               .getMergedIndexes(c1.getColIndices(), 
c2.getColIndices()));     
+               
+               //probe memo table for existing column group (avoid extraction)
+               PlanningCoCodingGroup c1c2 = _memo.get(c1c2Indexes);
+               
+               //create non-existing group and maintain global stats
+               incrStats(0, 1, 0); //probed plans
+               if( c1c2 == null ) { 
+                       c1c2 = new PlanningCoCodingGroup(c1, c2, estim, 
numRows);
+                       _memo.put(c1c2Indexes, c1c2);
+                       _optChangeInSize = Math.min(_optChangeInSize, 
c1c2.getChangeInSize());
+                       incrStats(0, 0, 1); //created plans
+               }
+               
+               return c1c2;
+       }
+       
+       public void remove(PlanningCoCodingGroup grp) {
+               //remove atomic groups
+               _memo.remove(new ColIndexes(grp.getColIndices()));
+               _memo.remove(new 
ColIndexes(grp.getLeftGroup().getColIndices()));
+               _memo.remove(new 
ColIndexes(grp.getRightGroup().getColIndices()));
+               
+               _optChangeInSize = 0;
+               
+               //remove overlapping groups and recompute min size
+               Iterator<Entry<ColIndexes,PlanningCoCodingGroup>> iter 
+                       = _memo.entrySet().iterator();
+               while( iter.hasNext() ) {
+                       PlanningCoCodingGroup tmp = iter.next().getValue();
+                       if( Arrays.equals(tmp.getLeftGroup().getColIndices(), 
grp.getLeftGroup().getColIndices())
+                               || 
Arrays.equals(tmp.getLeftGroup().getColIndices(), 
grp.getRightGroup().getColIndices())
+                               || 
Arrays.equals(tmp.getRightGroup().getColIndices(), 
grp.getLeftGroup().getColIndices())
+                               || 
Arrays.equals(tmp.getRightGroup().getColIndices(), 
grp.getRightGroup().getColIndices()))
+                       {
+                               iter.remove();
+                       }
+                       else
+                               _optChangeInSize = Math.min(_optChangeInSize, 
tmp.getChangeInSize());
+               }
+       }
+       
+       public void incrStats(int v1, int v2, int v3) {
+               _stats[0] += v1;
+               _stats[1] += v2;
+               _stats[2] += v3;
+       }
+       
+       public double getOptChangeInSize() {
+               return _optChangeInSize;
+       }
+
+       public int[] getStats() {
+               return _stats;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
index 2b49403..4c470e2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.compress.estim;
 
 import org.apache.sysml.runtime.compress.BitmapEncoder;
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.compress.UncompressedBitmap;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
@@ -29,9 +30,16 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 public abstract class CompressedSizeEstimator 
 {
        protected MatrixBlock _data;
+       protected final int _numRows;
 
        public CompressedSizeEstimator(MatrixBlock data) {
                _data = data;
+               _numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+                               _data.getNumColumns() : _data.getNumRows();
+       }
+       
+       public int getNumRows() {
+               return _numRows;
        }
 
        public abstract CompressedSizeInfo estimateCompressedColGroupSize(int[] 
colIndexes);
@@ -47,15 +55,19 @@ public abstract class CompressedSizeEstimator
                
                //compute size estimation factors
                for (int i = 0; i < numVals; i++) {
-                       int[] list = ubm.getOffsetsList(i);
-                       numOffs += list.length;
-                       numSegs += list[list.length - 1] / 
BitmapEncoder.BITMAP_BLOCK_SZ + 1;
-                       numSingle += (list.length==1) ? 1 : 0;
+                       int[] list = ubm.getOffsetsList(i).extractValues();
+                       int listSize = ubm.getNumOffsets(i);
+                       numOffs += listSize;
+                       numSegs += list[listSize - 1] / 
BitmapEncoder.BITMAP_BLOCK_SZ + 1;
+                       numSingle += (listSize==1) ? 1 : 0;
                        if( inclRLE ) {
                                int lastOff = -2;
-                               for (int j = 0; j < list.length; j++) {
-                                       if (list[j] != lastOff + 1)
-                                               numRuns++;
+                               for (int j = 0; j < listSize; j++) {
+                                       if( list[j] != lastOff + 1 ) {
+                                               numRuns++; //new run
+                                               numRuns += (list[j]-lastOff) / 
//empty runs
+                                                               
BitmapEncoder.BITMAP_BLOCK_SZ;
+                                       }
                                        lastOff = list[j];
                                }
                        }
@@ -107,6 +119,27 @@ public abstract class CompressedSizeEstimator
                ret += 2 * numSeqs;
                return ret;
        }
+       
+       /**
+        * Estimates the number of bytes needed to encode this column group 
+        * in DDC1 or DDC2 format.
+        * 
+        * @param numVals number of value tuples
+        * @param numRows number of rows
+        * @param numCols number of columns
+        * @return number of bytes to encode column group in RLE format
+        */
+       protected static long getDDCSize(int numVals, int numRows, int numCols) 
{
+               if( numVals > Character.MAX_VALUE-1 )
+                       return Long.MAX_VALUE;
+               
+               int ret = 0;
+               //distinct value tuples [double per col]
+               ret += 8 * numVals * numCols;
+               //data [byte or char per row]
+               ret += ((numVals>255) ? 2 : 1) * numRows;
+               return ret;
+       }
 
        protected static class SizeEstimationFactors {
                protected int numVals;   //num value tuples

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
index d24255d..3677c23 100644
--- 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -46,8 +46,9 @@ public class CompressedSizeEstimatorExact extends 
CompressedSizeEstimator
                SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, 
true);
                
                //construct new size info summary
-               return new CompressedSizeInfo(fact.numVals,
+               return new CompressedSizeInfo(fact.numVals, fact.numOffs,
                                getRLESize(fact.numVals, fact.numRuns, 
ubm.getNumColumns()),
-                               getOLESize(fact.numVals, fact.numOffs, 
fact.numSegs, ubm.getNumColumns()));
+                               getOLESize(fact.numVals, fact.numOffs, 
fact.numSegs, ubm.getNumColumns()),
+                               getDDCSize(fact.numVals, _numRows, 
ubm.getNumColumns()));
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
index eb0040f..a59893d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -21,103 +21,106 @@ package org.apache.sysml.runtime.compress.estim;
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.analysis.UnivariateFunction;
+import org.apache.commons.math3.analysis.solvers.UnivariateSolverUtils;
 import org.apache.commons.math3.distribution.ChiSquaredDistribution;
 import org.apache.commons.math3.random.RandomDataGenerator;
-import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.BitmapEncoder;
 import org.apache.sysml.runtime.compress.ReaderColumnSelection;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
-import org.apache.sysml.runtime.compress.ReaderColumnSelectionDense;
-import org.apache.sysml.runtime.compress.ReaderColumnSelectionDenseSample;
-import org.apache.sysml.runtime.compress.ReaderColumnSelectionSparse;
 import org.apache.sysml.runtime.compress.UncompressedBitmap;
 import org.apache.sysml.runtime.compress.utils.DblArray;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 public class CompressedSizeEstimatorSample extends CompressedSizeEstimator 
 {
-       private static final boolean CORRECT_NONZERO_ESTIMATE = false; //TODO 
enable for production
        private final static double SHLOSSER_JACKKNIFE_ALPHA = 0.975;
-       public static final float HAAS_AND_STOKES_ALPHA1 = 0.9F; //0.9 
recommended in paper
-       public static final float HAAS_AND_STOKES_ALPHA2 = 30F; //30 
recommended in paper
-       public static final float HAAS_AND_STOKES_UJ2A_C = 50; //50 recommend 
in paper
-
-       private int[] _sampleRows = null;
-       private RandomDataGenerator _rng = null;
-       private int _numRows = -1;
-
-       public CompressedSizeEstimatorSample(MatrixBlock data, int[] 
sampleRows) {
+       public static final double HAAS_AND_STOKES_ALPHA1 = 0.9; //0.9 
recommended in paper
+       public static final double HAAS_AND_STOKES_ALPHA2 = 30; //30 
recommended in paper
+       public static final int HAAS_AND_STOKES_UJ2A_C = 50; //50 recommend in 
paper
+       public static final boolean HAAS_AND_STOKES_UJ2A_CUT2 = true; //cut 
frequency in half
+       public static final boolean HAAS_AND_STOKES_UJ2A_SOLVE = true; //true 
recommended
+       public static final int MAX_SOLVE_CACHE_SIZE = 64*1024; //global 2MB 
cache
+       //note: we use a relatively high ALPHA2 and the cut-in-half approach 
because it
+       //leads to moderate overestimation (compared to systematic 
underestimation) in
+       //order to follow a conservative approach
+       
+       private static final Log LOG = 
LogFactory.getLog(CompressedSizeEstimatorSample.class.getName());
+
+       private static ThreadLocal<RandomDataGenerator> _rng = new 
ThreadLocal<RandomDataGenerator>() {
+        protected RandomDataGenerator initialValue() { return new 
RandomDataGenerator(); }
+    };
+    
+    private int[] _sampleRows = null;
+    private HashMap<Integer, Double> _solveCache = null;
+       
+    
+       public CompressedSizeEstimatorSample(MatrixBlock data, int sampleSize) 
+               throws DMLRuntimeException 
+       {
                super(data);
-               _sampleRows = sampleRows;
-               _rng = new RandomDataGenerator();
-               _numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ? 
-                               _data.getNumColumns() : _data.getNumRows();
-       }
-
-       public CompressedSizeEstimatorSample(MatrixBlock mb, int sampleSize) {
-               this(mb, null);
+               
+               //get sample of rows, incl eager extraction 
                _sampleRows = getSortedUniformSample(_numRows, sampleSize);
-       }
-
-       /**
-        * set the sample rows (assumed to be sorted)
-        * 
-        * @param sampleRows sample rows, assumed to be sorted
-        */
-       public void setSampleRows(int[] sampleRows) {
-               _sampleRows = sampleRows;
+               if( SizeEstimatorFactory.EXTRACT_SAMPLE_ONCE ) {
+                       MatrixBlock select = new MatrixBlock(_numRows, 1, 
false);
+                       for( int i=0; i<sampleSize; i++ )
+                               select.quickSetValue(_sampleRows[i], 0, 1);
+                       _data = _data.removeEmptyOperations(new MatrixBlock(), 
+                                       !CompressedMatrixBlock.TRANSPOSE_INPUT, 
select);
+               }
+               
+               //establish estimator-local cache for numeric solve
+               _solveCache = new HashMap<Integer, Double>();
        }
 
        @Override
        public CompressedSizeInfo estimateCompressedColGroupSize(int[] 
colIndexes) 
        {
+               int sampleSize = _sampleRows.length;
+               int numCols = colIndexes.length;
+               int[] sampleRows = _sampleRows;
+               
                //extract statistics from sample
-               UncompressedBitmap ubm = BitmapEncoder.extractBitmapFromSample(
-                               colIndexes, _data, _sampleRows);
+               UncompressedBitmap ubm = 
SizeEstimatorFactory.EXTRACT_SAMPLE_ONCE ?
+                               BitmapEncoder.extractBitmap(colIndexes, _data) :
+                               
BitmapEncoder.extractBitmapFromSample(colIndexes, _data, sampleRows);
                SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, 
false);
-
-               //estimate number of distinct values 
-               int totalCardinality = getNumDistinctValues(colIndexes);
-               totalCardinality = Math.max(totalCardinality, fact.numVals); 
//fix anomalies w/ large sample fraction
-               totalCardinality = Math.min(totalCardinality, _numRows); //fix 
anomalies w/ large sample fraction
                
-               //estimate unseen values
-               // each unseen is assumed to occur only once (it did not show 
up in the sample because it is rare)
-               int unseen = Math.max(0, totalCardinality - fact.numVals);
-               int sampleSize = _sampleRows.length;
-               
-               //estimate number of offsets
-               double sparsity = OptimizerUtils.getSparsity(
-                               _data.getNumRows(), _data.getNumColumns(), 
_data.getNonZeros());
+               //estimate number of distinct values (incl fixes for anomalies 
w/ large sample fraction)
+               int totalCardinality = getNumDistinctValues(ubm, _numRows, 
sampleRows, _solveCache);
+               totalCardinality = Math.max(totalCardinality, fact.numVals);
+               totalCardinality = Math.min(totalCardinality, _numRows); 
                
-               // expected value given that we don't store the zero values
-               float totalNumOffs = (float) (_numRows * (1 - Math.pow(1 - 
sparsity,colIndexes.length)));               
-               if( CORRECT_NONZERO_ESTIMATE ) {
-                       long numZeros = sampleSize - fact.numOffs;
-                       float C = Math.max(1-(float)fact.numSingle/sampleSize, 
(float)sampleSize/_numRows); 
-                       totalNumOffs = _numRows - ((numZeros>0)? 
(float)_numRows/sampleSize*C*numZeros : 0);
-               }
+               //estimate unseen values
+               int unseenVals = totalCardinality - fact.numVals;
                
-               // For a single offset, the number of blocks depends on the 
value of
-               // that offset. small offsets (first group of rows in the 
matrix)
-               // require a small number of blocks and large offsets (last 
group of
-               // rows) require a large number of blocks. The unseen offsets 
are
-               // distributed over the entire offset range. A reasonable and 
fast
-               // estimate for the number of blocks is to use the arithmetic 
mean of
-               // the number of blocks used for the first index (=1) and that 
of the
-               // last index.
-               int numUnseenSeg = Math.round(unseen
-                               * (2.0f * BitmapEncoder.BITMAP_BLOCK_SZ + 
_numRows) / 2
-                               / BitmapEncoder.BITMAP_BLOCK_SZ);
+               //estimate number of non-zeros (conservatively round up)
+               double C = Math.max(1 - (double)fact.numSingle/sampleSize, 
(double)sampleSize/_numRows); 
+               int numZeros = sampleSize - fact.numOffs; //>=0
+               int numNonZeros = (int)Math.ceil(_numRows - 
(double)_numRows/sampleSize * C * numZeros);
+               numNonZeros = Math.max(numNonZeros, totalCardinality); //handle 
anomaly of zi=0
+
+               if( totalCardinality<=0 || unseenVals<0 || numZeros<0 || 
numNonZeros<=0 )
+                       LOG.warn("Invalid estimates detected for 
"+Arrays.toString(colIndexes)+": "
+                                       +totalCardinality+" "+unseenVals+" 
"+numZeros+" "+numNonZeros);
+                       
+               // estimate number of segments and number of runs incl 
correction for
+               // empty segments and empty runs (via expected mean of offset 
value)
+               int numUnseenSeg = (int) (unseenVals * 
+                       
Math.ceil((double)_numRows/BitmapEncoder.BITMAP_BLOCK_SZ/2));
                int totalNumSeg = fact.numSegs + numUnseenSeg;
-               int totalNumRuns = getNumRuns(ubm, sampleSize, _numRows) + 
unseen;
+               int totalNumRuns = getNumRuns(ubm, sampleSize, _numRows, 
sampleRows) + numUnseenSeg;
 
                //construct new size info summary
-               return new CompressedSizeInfo(totalCardinality,
-                               getRLESize(totalCardinality, totalNumRuns, 
colIndexes.length),
-                               getOLESize(totalCardinality, totalNumOffs, 
totalNumSeg, colIndexes.length));
+               return new CompressedSizeInfo(totalCardinality, numNonZeros,
+                               getRLESize(totalCardinality, totalNumRuns, 
numCols),
+                               getOLESize(totalCardinality, numNonZeros, 
totalNumSeg, numCols),
+                               getDDCSize(totalCardinality, _numRows, 
numCols));
        }
 
        @Override
@@ -127,47 +130,50 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, 
true);
                
                //construct new size info summary
-               return new CompressedSizeInfo(fact.numVals,
+               return new CompressedSizeInfo(fact.numVals, fact.numOffs,
                                getRLESize(fact.numVals, fact.numRuns, 
ubm.getNumColumns()),
-                               getOLESize(fact.numVals, fact.numOffs, 
fact.numSegs, ubm.getNumColumns()));
+                               getOLESize(fact.numVals, fact.numOffs, 
fact.numSegs, ubm.getNumColumns()),
+                               getDDCSize(fact.numVals, _numRows, 
ubm.getNumColumns()));
        }
 
-       private int getNumDistinctValues(int[] colIndexes) {
-               return haasAndStokes(colIndexes);
+       private static int getNumDistinctValues(UncompressedBitmap ubm, int 
numRows, int[] sampleRows, 
+                       HashMap<Integer, Double> solveCache) {
+               return haasAndStokes(ubm, numRows, sampleRows.length, 
solveCache);
        }
 
-       private int getNumRuns(UncompressedBitmap sampleUncompressedBitmap,
-                       int sampleSize, int totalNumRows) {
-               int numVals = sampleUncompressedBitmap.getNumValues();
+       private static int getNumRuns(UncompressedBitmap ubm,
+                       int sampleSize, int totalNumRows, int[] sampleRows) {
+               int numVals = ubm.getNumValues();
                // all values in the sample are zeros
                if (numVals == 0)
                        return 0;
-               float numRuns = 0;
+               double numRuns = 0;
                for (int vi = 0; vi < numVals; vi++) {
-                       int[] offsets = 
sampleUncompressedBitmap.getOffsetsList(vi);
-                       float offsetsRatio = ((float) offsets.length) / 
sampleSize;
-                       float avgAdditionalOffsets = offsetsRatio * totalNumRows
+                       int[] offsets = ubm.getOffsetsList(vi).extractValues();
+                       int offsetsSize = ubm.getNumOffsets(vi);
+                       double offsetsRatio = ((double) offsetsSize) / 
sampleSize;
+                       double avgAdditionalOffsets = offsetsRatio * 
totalNumRows
                                        / sampleSize;
                        if (avgAdditionalOffsets < 1) {
                                // Ising-Stevens does not hold
                                // fall-back to using the expected number of 
offsets as an upper
                                // bound on the number of runs
-                               numRuns += ((float) offsets.length) * 
totalNumRows / sampleSize;
+                               numRuns += ((double) offsetsSize) * 
totalNumRows / sampleSize;
                                continue;
                        }
                        int intervalEnd, intervalSize;
-                       float additionalOffsets;
+                       double additionalOffsets;
                        // probability of an index being non-offset in current 
and previous
                        // interval respectively
-                       float nonOffsetProb, prevNonOffsetProb = 1;
+                       double nonOffsetProb, prevNonOffsetProb = 1;
                        boolean reachedSampleEnd = false;
                        // handling the first interval separately for simplicity
                        int intervalStart = -1;
-                       if (_sampleRows[0] == 0) {
+                       if (sampleRows[0] == 0) {
                                // empty interval
                                intervalStart = 0;
                        } else {
-                               intervalEnd = _sampleRows[0];
+                               intervalEnd = sampleRows[0];
                                intervalSize = intervalEnd - intervalStart - 1;
                                // expected value of a multivariate 
hypergeometric distribution
                                additionalOffsets = offsetsRatio * intervalSize;
@@ -188,7 +194,7 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                                // intervalStart will always be pointing at the 
current value
                                // in the separator block
 
-                               if (offsetsPtrs < offsets.length
+                               if (offsetsPtrs < offsetsSize
                                                && offsets[offsetsPtrs] == 
intervalStart) {
                                        startedWithOffset = true;
                                        offsetsPtrs++;
@@ -197,10 +203,10 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                                        seenNonOffset = true;
                                        endedWithOffset = false;
                                }
-                               while (intervalStart + 1 == _sampleRows[ix]) {
-                                       intervalStart = _sampleRows[ix];
+                               while (intervalStart + 1 == sampleRows[ix]) {
+                                       intervalStart = sampleRows[ix];
                                        if (seenNonOffset) {
-                                               if (offsetsPtrs < offsets.length
+                                               if (offsetsPtrs < offsetsSize
                                                                && 
offsets[offsetsPtrs] == intervalStart) {
                                                        withinSepRun = 1;
                                                        offsetsPtrs++;
@@ -210,7 +216,7 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                                                        withinSepRun = 0;
                                                        endedWithOffset = false;
                                                }
-                                       } else if (offsetsPtrs < offsets.length
+                                       } else if (offsetsPtrs < offsetsSize
                                                        && offsets[offsetsPtrs] 
== intervalStart) {
                                                offsetsPtrs++;
                                                endedWithOffset = true;
@@ -230,7 +236,7 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                                // runs within an interval of unknowns
                                if (reachedSampleEnd)
                                        break;
-                               intervalEnd = _sampleRows[ix];
+                               intervalEnd = sampleRows[ix];
                                intervalSize = intervalEnd - intervalStart - 1;
                                // expected value of a multivariate 
hypergeometric distribution
                                additionalOffsets = offsetsRatio * intervalSize;
@@ -280,7 +286,7 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                        }
                        // additional runs resulting from x's on the boundaries 
of the
                        // separators
-                       endedWithOffset = intervalStart == 
offsets[offsets.length - 1];
+                       endedWithOffset = intervalStart == offsets[offsetsSize 
- 1];
                        if (seenNonOffset) {
                                if (startedWithOffset) {
                                        numRuns += prevNonOffsetProb;
@@ -296,31 +302,7 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                                        numRuns += prevNonOffsetProb * 
nonOffsetProb;
                        }
                }
-               return Math.round(numRuns);
-       }
-
-       private int haasAndStokes(int[] colIndexes) {
-               ReaderColumnSelection reader =  new 
ReaderColumnSelectionDenseSample(_data, 
-                               colIndexes, _sampleRows, 
!CompressedMatrixBlock.MATERIALIZE_ZEROS);
-               return haasAndStokes(_numRows, _sampleRows.length, reader);
-       }
-
-       /**
-        * TODO remove, just for local debugging.
-        * 
-        * @param colIndexes column indexes
-        * @return exact number of district values
-        */
-       @SuppressWarnings("unused")
-       private int getExactNumDistinctValues(int[] colIndexes) {
-               HashSet<DblArray> distinctVals = new HashSet<DblArray>();
-               ReaderColumnSelection reader = (_data.isInSparseFormat() && 
CompressedMatrixBlock.TRANSPOSE_INPUT) ? 
-                               new ReaderColumnSelectionSparse(_data, 
colIndexes, !CompressedMatrixBlock.MATERIALIZE_ZEROS) : 
-                               new ReaderColumnSelectionDense(_data, 
colIndexes, !CompressedMatrixBlock.MATERIALIZE_ZEROS);
-               DblArray val = null;
-               while (null != (val = reader.nextRow()))
-                       distinctVals.add(val);
-               return distinctVals.size();
+               return (int)Math.min(Math.round(numRuns), Integer.MAX_VALUE);
        }
 
        /**
@@ -330,10 +312,11 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
         * @param smplSize sample size
         * @return sorted array of integers
         */
-       private int[] getSortedUniformSample(int range, int smplSize) {
+       private static int[] getSortedUniformSample(int range, int smplSize) {
                if (smplSize == 0)
                        return new int[] {};
-               int[] sample = _rng.nextPermutation(range, smplSize);
+               RandomDataGenerator rng = _rng.get();
+               int[] sample = rng.nextPermutation(range, smplSize);
                Arrays.sort(sample);
                return sample;
        }
@@ -380,22 +363,13 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
         * @param sampleRowsReader reader
         * @return estimator
         */
-       @SuppressWarnings("unused")
-       private static int shlosserEstimator(int nRows, int sampleSize,
-                       ReaderColumnSelection sampleRowsReader) 
-       {
-               return shlosserEstimator(nRows, sampleSize, sampleRowsReader,
-                               getValCounts(sampleRowsReader));
-       }
-
-       private static int shlosserEstimator(int nRows, int sampleSize,
-                       ReaderColumnSelection sampleRowsReader,
-                       HashMap<DblArray, Integer> valsCount) 
+       private static int shlosserEstimator(UncompressedBitmap ubm, int nRows, 
int sampleSize) 
        {
                double q = ((double) sampleSize) / nRows;
                double oneMinusQ = 1 - q;
 
-               int[] freqCounts = getFreqCounts(valsCount);
+               int numVals = ubm.getNumValues();
+               int[] freqCounts = getFreqCounts(ubm);
 
                double numerSum = 0, denomSum = 0;
                int iPlusOne = 1;
@@ -403,7 +377,7 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                        numerSum += Math.pow(oneMinusQ, iPlusOne) * 
freqCounts[i];
                        denomSum += iPlusOne * q * Math.pow(oneMinusQ, i) * 
freqCounts[i];
                }
-               int estimate = (int) Math.round(valsCount.size() + freqCounts[0]
+               int estimate = (int) Math.round(numVals + freqCounts[0]
                                * numerSum / denomSum);
                return estimate < 1 ? 1 : estimate;
        }
@@ -418,25 +392,16 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
         * @param sampleRowsReader row reader
         * @return estimator
         */
-       @SuppressWarnings("unused")
-       private static int smoothedJackknifeEstimator(int nRows, int sampleSize,
-                       ReaderColumnSelection sampleRowsReader) 
-       {
-               return smoothedJackknifeEstimator(nRows, sampleSize, 
sampleRowsReader,
-                               getValCounts(sampleRowsReader));
-       }
-
-       private static int smoothedJackknifeEstimator(int nRows, int sampleSize,
-                       ReaderColumnSelection sampleRowsReader,
-                       HashMap<DblArray, Integer> valsCount) 
+       private static int smoothedJackknifeEstimator(UncompressedBitmap ubm, 
int nRows, int sampleSize) 
        {
-               int[] freqCounts = getFreqCounts(valsCount);
+               int numVals = ubm.getNumValues();
+               int[] freqCounts = getFreqCounts(ubm);
                // all values in the sample are zeros
                if (freqCounts.length == 0)
                        return 0;
                // nRows is N and sampleSize is n
 
-               int d = valsCount.size();
+               int d = numVals;
                double f1 = freqCounts[0];
                int Nn = nRows * sampleSize;
                double D0 = (d - f1 / sampleSize)
@@ -515,43 +480,31 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
         * @return estimator
         */
        @SuppressWarnings("unused")
-       private static int shlosserJackknifeEstimator(int nRows, int sampleSize,
-                       ReaderColumnSelection sampleRowsReader) {
-               HashMap<DblArray, Integer> valsCount = 
getValCounts(sampleRowsReader);
-
+       private static int shlosserJackknifeEstimator(UncompressedBitmap ubm, 
int nRows, int sampleSize) 
+       {
+               int numVals = ubm.getNumValues();
+               CriticalValue cv = computeCriticalValue(sampleSize);
+               
                // uniformity chi-square test
-               double nBar = ((double) sampleSize) / valsCount.size();
+               double nBar = ((double) sampleSize) / numVals;
                // test-statistic
                double u = 0;
-               for (int cnt : valsCount.values()) {
-                       u += Math.pow(cnt - nBar, 2);
+               for( int i=0; i<numVals; i++ ) {
+                       u += Math.pow(ubm.getNumOffsets(i) - nBar, 2);
                }
                u /= nBar;
-               if (sampleSize != usedSampleSize)
+               if (sampleSize != cv.usedSampleSize)
                        computeCriticalValue(sampleSize);
-               if (u < uniformityCriticalValue) {
-                       // uniform
-                       return smoothedJackknifeEstimator(nRows, sampleSize,
-                                       sampleRowsReader, valsCount);
-               } else {
-                       return shlosserEstimator(nRows, sampleSize, 
sampleRowsReader,
-                                       valsCount);
-               }
+               if (u < cv.uniformityCriticalValue) // uniform
+                       return smoothedJackknifeEstimator(ubm, nRows, 
sampleSize);
+               else 
+                       return shlosserEstimator(ubm, nRows, sampleSize);
        }
 
-       /*
-        * In the shlosserSmoothedJackknifeEstimator as long as the sample size 
did
-        * not change, we will have the same critical value each time the 
estimator
-        * is used (given that alpha is the same). We cache the critical value 
to
-        * avoid recomputing it in each call.
-        */
-       private static double uniformityCriticalValue;
-       private static int usedSampleSize;
-       
-       private static void computeCriticalValue(int sampleSize) {
+       private static CriticalValue computeCriticalValue(int sampleSize) {
                ChiSquaredDistribution chiSqr = new 
ChiSquaredDistribution(sampleSize - 1);
-               uniformityCriticalValue = 
chiSqr.inverseCumulativeProbability(SHLOSSER_JACKKNIFE_ALPHA);
-               usedSampleSize = sampleSize;
+               return new CriticalValue(
+                       
chiSqr.inverseCumulativeProbability(SHLOSSER_JACKKNIFE_ALPHA), sampleSize);
        }
 
        /**
@@ -563,115 +516,43 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
         * 
         * @param nRows number of rows
         * @param sampleSize sample size
+        * @param solveCache 
         * @param sampleRowsReader row reader
         * @return estimator
         */
-       private static int haasAndStokes(int nRows, int sampleSize,
-                       ReaderColumnSelection sampleRowsReader) 
+       private static int haasAndStokes(UncompressedBitmap ubm, int nRows, int 
sampleSize, HashMap<Integer, Double> solveCache)
        {
-               HashMap<DblArray, Integer> valsCount = 
getValCounts(sampleRowsReader);
+               //obtain value and frequency histograms
+               int numVals = ubm.getNumValues();
+               int[] freqCounts = getFreqCounts(ubm);
+       
                // all values in the sample are zeros.
-               if (valsCount.size() == 0)
+               if( numVals == 0 )
                        return 1;
-               int[] freqCounts = getFreqCounts(valsCount);
-               float q = ((float) sampleSize) / nRows;
-               float _1MinusQ = 1 - q;
-               // Eq. 11
-               float duj1Fraction = ((float) sampleSize)
-                               / (sampleSize - _1MinusQ * freqCounts[0]);
-               float duj1 = duj1Fraction * valsCount.size();
-               // Eq. 16
-               float gamma = 0;
-               for (int i = 1; i <= freqCounts.length; i++) {
-                       gamma += i * (i - 1) * freqCounts[i - 1];
-               }
-               gamma *= duj1 / sampleSize / sampleSize;
-               gamma += duj1 / nRows - 1;
-               gamma = Math.max(gamma, 0);
-               int estimate;
                
-               if (gamma < HAAS_AND_STOKES_ALPHA1) {
-                       // UJ2 - begining of page 1479
-               //      System.out.println("uj2");
-                       estimate = (int) (duj1Fraction * (valsCount.size() - 
freqCounts[0]
-                                       * _1MinusQ * Math.log(_1MinusQ) * gamma 
/ q));
-               } else if (gamma < HAAS_AND_STOKES_ALPHA2) {
-                       // UJ2a - end of page 1998
-                       //System.out.println("uj2a");
-                       int numRemovedClasses = 0;
-                       float updatedNumRows = nRows;
-                       int updatedSampleSize = sampleSize;
-
-                       for (Integer cnt : valsCount.values()) {
-                               if (cnt > HAAS_AND_STOKES_UJ2A_C) {
-                                       numRemovedClasses++;
-                                       freqCounts[cnt - 1]--;
-                                       updatedSampleSize -= cnt;
-                                       /*
-                                        * To avoid solving Eq. 20 numerically 
for the class size in
-                                        * the full population (N_j), the 
current implementation
-                                        * just scales cnt (n_j) by the 
sampling ratio (q).
-                                        * Intuitively, the scaling should be 
fine since cnt is
-                                        * large enough. Also, N_j in Eq. 20 is 
lower-bounded by cnt
-                                        * which is already large enough to 
make the denominator in
-                                        * Eq. 20 very close to 1.
-                                        */
-                                       updatedNumRows -= ((float) cnt) / q;
-                               }
-                       }
-                       if (updatedSampleSize == 0) {
-                               // use uJ2a
-                               
-                               estimate = (int) (duj1Fraction * 
(valsCount.size() - freqCounts[0]
-                                               * (_1MinusQ) * 
Math.log(_1MinusQ) * gamma / q));
-                       } else {
-                               float updatedQ = ((float) updatedSampleSize) / 
updatedNumRows;
-                               int updatedSampleCardinality = valsCount.size()
-                                               - numRemovedClasses;
-                               float updatedDuj1Fraction = ((float) 
updatedSampleSize)
-                                               / (updatedSampleSize - (1 - 
updatedQ) * freqCounts[0]);
-                               float updatedDuj1 = updatedDuj1Fraction
-                                               * updatedSampleCardinality;
-                               float updatedGamma = 0;
-                               for (int i = 1; i <= freqCounts.length; i++) {
-                                       updatedGamma += i * (i - 1) * 
freqCounts[i - 1];
-                               }
-                               updatedGamma *= updatedDuj1 / updatedSampleSize
-                                               / updatedSampleSize;
-                               updatedGamma += updatedDuj1 / updatedNumRows - 
1;
-                               updatedGamma = Math.max(updatedGamma, 0);
-
-                               estimate = (int) (updatedDuj1Fraction * 
(updatedSampleCardinality - freqCounts[0]
-                                               * (1 - updatedQ)
-                                               * Math.log(1 - updatedQ)
-                                               * updatedGamma / updatedQ))
-                                               + numRemovedClasses;
-                       }
-
-               } else {
-                       // Sh3 - end of section 3
-                       float fraq1Numer = 0;
-                       float fraq1Denom = 0;
-                       float fraq2Numer = 0;
-                       float fraq2Denom = 0;
-                       for (int i = 1; i <= freqCounts.length; i++) {
-                               fraq1Numer += i * q * q * Math.pow(1 - q * q, i 
- 1)
-                                               * freqCounts[i - 1];
-                               fraq1Denom += Math.pow(_1MinusQ, i) * 
(Math.pow(1 + q, i) - 1)
-                                               * freqCounts[i - 1];
-                               fraq2Numer += Math.pow(_1MinusQ, i) * 
freqCounts[i - 1];
-                               fraq2Denom += i * q * Math.pow(_1MinusQ, i - 1)
-                                               * freqCounts[i - 1];
-                       }
-                       estimate = (int) (valsCount.size() + freqCounts[0] * 
fraq1Numer
-                                       / fraq1Denom * fraq2Numer * fraq2Numer 
/ fraq2Denom
-                                       / fraq2Denom);
-               }
-               return estimate < 1 ? 1 : estimate;
+               double q = ((double) sampleSize) / nRows;
+               double f1 = freqCounts[0];
+               
+               //compute basic Duj1 estimate
+               double duj1 = getDuj1Estimate(q, f1, sampleSize, numVals);
+               
+               //compute gamma based on Duj1
+               double gamma = getGammaSquared(duj1, freqCounts, sampleSize, 
nRows);
+               double d = -1;
+               
+               //core hybrid estimator based on gamma
+               if (gamma < HAAS_AND_STOKES_ALPHA1)
+                       d = getDuj2Estimate(q, f1, sampleSize, numVals, gamma);
+               else if (gamma < HAAS_AND_STOKES_ALPHA2)
+                       d = getDuj2aEstimate(q, freqCounts, sampleSize, 
numVals, gamma, nRows, solveCache);
+               else
+                       d = getSh3Estimate(q, freqCounts, numVals);
+               
+               //round and ensure min value 1
+               return Math.max(1, (int)Math.round(d));
        }
 
-       private static HashMap<DblArray, Integer> getValCounts(
-                       ReaderColumnSelection sampleRowsReader) 
+       private static HashMap<DblArray, Integer> 
getValCounts(ReaderColumnSelection sampleRowsReader) 
        {
                HashMap<DblArray, Integer> valsCount = new HashMap<DblArray, 
Integer>();
                DblArray val = null;
@@ -681,27 +562,179 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator
                        if (cnt == null)
                                cnt = 0;
                        cnt++;
-                       valsCount.put(val, cnt);
+                       valsCount.put(new DblArray(val), cnt);
                }
                return valsCount;
        }
 
-       private static int[] getFreqCounts(HashMap<DblArray, Integer> 
valsCount) 
+       /**
+        * Creates an inverted histogram, where freqCounts[i-1] indicates 
+        * how many values occurred with a frequency i. Note that freqCounts[0]
+        * represents the special values of the number of singletons. 
+        * 
+        * @param ubm uncompressed bitmap
+        * @return frequency counts
+        */
+       private static int[] getFreqCounts(UncompressedBitmap ubm) 
        {
+               //determine max frequency
+               int numVals = ubm.getNumValues();
                int maxCount = 0;
-               for (Integer c : valsCount.values()) {
-                       if (c > maxCount)
-                               maxCount = c;
-               }
-               
-               /*
-                * freqCounts[i-1] = how many values occured with a frequecy i
-                */
+               for( int i=0; i<numVals; i++ )
+                       maxCount = Math.max(maxCount, ubm.getNumOffsets(i));
+                       
+               //create frequency histogram
                int[] freqCounts = new int[maxCount];
-               for (Integer c : valsCount.values()) {
-                       freqCounts[c - 1]++;
-               }
+               for( int i=0; i<numVals; i++ )
+                       freqCounts[ubm.getNumOffsets(i)-1] ++;
+
                return freqCounts;
 
        }
+
+       /**
+        * Computes the "unsmoothed first-order jackknife estimator" (Eq 11).
+        * 
+        */
+       private static double getDuj1Estimate(double q, double f1, int n, int 
dn) {
+               return dn / (1 - ((1-q) * f1)/n);
+       }
+       
+       /**
+        * Computes the "unsmoothed second-order jackknife estimator" (Eq 18b).
+        * 
+        */
+       private static double getDuj2Estimate(double q, double f1, int n, int 
dn, double gammaDuj1) {
+               return (dn - (1-q) * f1 * Math.log(1-q) * gammaDuj1 / q) / (1 - 
((1-q) * f1)/n);
+       }
+       
+       /**
+        * Computes the "unsmoothed second-order jackknife estimator" with 
additional  
+        * stabilization procedure, which removes the classes whose frequency 
exceed c,
+        * computes Duj2 over the reduced sample, and finally adds the removed 
frequencies.
+        * 
+        */
+       private static double getDuj2aEstimate(double q, int f[], int n, int 
dn, double gammaDuj1, int N, 
+                       HashMap<Integer, Double> solveCache) {
+               int c = HAAS_AND_STOKES_UJ2A_CUT2 ? 
+                       f.length/2+1 : HAAS_AND_STOKES_UJ2A_C+1;
+               
+               //compute adjusted sample size after removing classes that
+               //exceed a fixed frequency  c
+               int nB = 0, cardB = 0;
+               for( int i=c; i<=f.length; i++ ) 
+                       if( f[i-1] != 0 ) {
+                               nB += f[i-1] * i; //numVals times frequency 
+                               cardB += f[i-1];
+                       }
+               
+               //fallback to Duj2 over full sample if only high frequency 
columns
+               if( n - nB == 0 )
+                       return getDuj2Estimate(q, f[0], n, dn, gammaDuj1);
+
+               //compute reduced population size via numeric solve
+               int updatedN = N; 
+               for( int i=c; i<=f.length; i++ )
+                       if( f[i-1] != 0 )
+                               updatedN -= f[i-1] * 
(!HAAS_AND_STOKES_UJ2A_SOLVE ? i/q :
+                                       getMethodOfMomentsEstimate(i, q, 1, N, 
solveCache));
+               
+               //remove classes that exceed a fixed frequency c
+               for( int i=c; i<=f.length; i++ )
+                       f[i-1] = 0; 
+               
+               //compute duj2a over reduced sample
+               double updatedDuj1 = getDuj1Estimate(q, f[0], n-nB, dn-cardB);
+               double updatedGammaDuj1 = getGammaSquared(updatedDuj1, f, n-nB, 
updatedN);
+               double duj2 = getDuj2Estimate(q, f[0], n-nB, dn-cardB, 
updatedGammaDuj1);
+               return duj2 + cardB;            
+       }
+       
+       /**
+        * Computed the "shlosser third-order estimator". (Eq 30b)
+        * 
+        * Note that this estimator can show anomalies with NaN as the results
+        * due to terms such as Math.pow(1+q, i) which exceed Double.MAX_VALUE
+        * even for moderately large i, e.g., q=0.05 at around 14K.
+        * 
+        */
+       private static double getSh3Estimate(double q, int[] f, double dn) {
+               double fraq11 = 0, fraq12 = 0, fraq21 = 0, fraq22 = 0;
+               for( int i=1; i<=f.length; i++ ) 
+                       if( f[i-1] != 0 ) {
+                               fraq11 += i * q*q * Math.pow(1 - q*q, i-1) * 
f[i-1];
+                               //NOTE: numerically unstable due to 
Math.pow(1+q, i) overflows
+                               //fraq12 += Math.pow(1 - q, i) * (Math.pow(1+q, 
i)-1) * f[i-1];
+                               fraq12 += (Math.pow(1 - q*q, i) - Math.pow(1 - 
q, i)) * f[i-1];
+                               fraq21 += Math.pow(1 - q, i) * f[i-1];
+                               fraq22 += i * q * Math.pow(1 - q, i-1) * f[i-1];
+                       }
+               return dn + f[0] * fraq11/fraq12 * Math.pow(fraq21/fraq22, 2); 
+       }
+       
+       /**
+        * Computes the "squared coefficient of variation" based on a given 
+        * initial estimate D (Eq 16).
+        * 
+        */
+       private static double getGammaSquared(double D, int[] f, int n, int N) {
+               double gamma = 0;
+               for( int i=1; i<=f.length; i++) 
+                       if( f[i-1] != 0 )
+                               gamma += i * (i-1) * f[i-1];
+               gamma *= D / n / n;
+               gamma += D / N - 1;
+               return Math.max(0, gamma);
+       }
+       
+       /**
+        * Solves the method-of-moments estimate numerically. We use a cache
+        * on the same observed instances in the sample as q is constant and
+        * min/max are chosen conservatively.
+        * 
+        */
+       private static double getMethodOfMomentsEstimate(int nj, double q, 
double min, double max, 
+               HashMap<Integer, Double> solveCache) {
+               if( solveCache.containsKey(nj) )
+                       return solveCache.get(nj);
+               
+               double est = UnivariateSolverUtils
+                       .solve(new MethodOfMomentsFunction(nj, q), min, max, 
1e-9);
+               
+               if( solveCache.size()<MAX_SOLVE_CACHE_SIZE )
+                       solveCache.put(nj, est);
+               
+               return est;
+       }
+       
+       /*
+        * In the shlosserSmoothedJackknifeEstimator as long as the sample size 
did
+        * not change, we will have the same critical value each time the 
estimator
+        * is used (given that alpha is the same). We cache the critical value 
to
+        * avoid recomputing it in each call.
+        */
+       private static class CriticalValue {
+               public final double uniformityCriticalValue;
+               public final int usedSampleSize;
+               
+               public CriticalValue(double cv, int size) {
+                       uniformityCriticalValue = cv;
+                       usedSampleSize = size;
+               } 
+       }
+       
+       private static class MethodOfMomentsFunction implements 
UnivariateFunction {
+               private final int _nj;
+               private final double _q;
+               
+               public MethodOfMomentsFunction(int nj, double q) {
+                       _nj = nj;
+                       _q = q;
+               }
+               
+               @Override
+               public double value(double x) {
+                       return _q*x / (1-Math.pow(1-_q, x)) - _nj;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
index 430783d..60acdeb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
@@ -19,51 +19,53 @@
 
 package org.apache.sysml.runtime.compress.estim;
 
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+
 /**
  * 
  * A helper reusable object for maintaining bitmap sizes
  */
 public class CompressedSizeInfo 
 {
-       private int _estCard = -1;
-       private long _rleSize = -1; 
-       private long _oleSize = -1;
-
-       public CompressedSizeInfo() {
-               
-       }
+       private final int _estCard;
+       private final int _estNnz;
+       private final long _rleSize; 
+       private final long _oleSize;
+       private final long _ddcSize;
 
-       public CompressedSizeInfo(int estCard, long rleSize, long oleSize) {
+       public CompressedSizeInfo(int estCard, int estNnz, long rleSize, long 
oleSize, long ddcSize) {
                _estCard = estCard;
+               _estNnz = estNnz;
                _rleSize = rleSize;
                _oleSize = oleSize;
+               _ddcSize = ddcSize;
        }
 
-       public void setRLESize(long rleSize) {
-               _rleSize = rleSize;
-       }
-       
        public long getRLESize() {
                return _rleSize;
        }
-       
-       public void setOLESize(long oleSize) {
-               _oleSize = oleSize;
-       }
 
        public long getOLESize() {
                return _oleSize;
        }
-
-       public long getMinSize() {
-               return Math.min(_rleSize, _oleSize);
+       
+       public long getDDCSize() {
+               return CompressedMatrixBlock.ALLOW_DDC_ENCODING ? 
+                       _ddcSize : Long.MAX_VALUE; 
        }
 
-       public void setEstCardinality(int estCard) {
-               _estCard = estCard;
+       public long getMinSize() {
+               return Math.min(Math.min(
+                       getRLESize(), 
+                       getOLESize()),
+                       getDDCSize());
        }
 
-       public int getEstCarinality() {
+       public int getEstCard() {
                return _estCard;
        }
+       
+       public int getEstNnz() {
+               return _estNnz;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
 
b/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
index c142103..63a092c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
@@ -19,14 +19,16 @@
 
 package org.apache.sysml.runtime.compress.estim;
 
+import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 public class SizeEstimatorFactory 
 {
-       public static final float SAMPLING_RATIO = 0.01f; //conservative default
+       public static final double SAMPLING_RATIO = 0.05; //conservative default
+       public static final boolean EXTRACT_SAMPLE_ONCE = true;
 
        @SuppressWarnings("unused")
-       public static CompressedSizeEstimator getSizeEstimator(MatrixBlock 
data, int numRows) {
+       public static CompressedSizeEstimator getSizeEstimator(MatrixBlock 
data, int numRows) throws DMLRuntimeException {
                return (SAMPLING_RATIO == 1.0) ?
                                new CompressedSizeEstimatorExact(data):
                                new CompressedSizeEstimatorSample(data, (int) 
(numRows*SAMPLING_RATIO));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java 
b/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
index 37b2984..f4d9f1c 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.compress.utils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.sysml.runtime.compress.ColGroup;
@@ -70,4 +71,19 @@ public class ConverterUtils
                else 
                        return vector.getDenseBlock();
        }
+
+       public static MatrixBlock getUncompressedColBlock( ColGroup group )
+       {
+               MatrixBlock ret = null;
+               if( group instanceof ColGroupUncompressed ) {
+                       ret = ((ColGroupUncompressed) group).getData();
+               }
+               else {
+                       ArrayList<ColGroup> tmpGroup = new 
ArrayList<ColGroup>(Arrays.asList(group));
+                       ColGroupUncompressed decompressedCols = new 
ColGroupUncompressed(tmpGroup);
+                       ret = decompressedCols.getData();
+               }
+               
+               return ret;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java 
b/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
index ef4d476..0f2f091 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
@@ -66,11 +66,18 @@ public class IntArrayList
                _size++;
        }
 
+       /**
+        * Returns the underlying array of offsets. Note that this array might 
be 
+        * physically larger than the actual length of the offset lists. Use 
size() 
+        * to obtain the actual length.
+        * 
+        * @return
+        */
        public int[] extractValues() {
                if( _size == 1 )
                        return new int[] { _val0 };
                else
-                       return Arrays.copyOfRange(_data, 0, _size);
+                       return _data;
        }
 
        private void resize() {
@@ -80,8 +87,6 @@ public class IntArrayList
                                        "IntArrayList resize leads to integer 
overflow: size=" + _size);
 
                // resize data array and copy existing contents
-               int[] newdata = new int[_data.length * RESIZE_FACTOR];
-               System.arraycopy(_data, 0, newdata, 0, _size);
-               _data = newdata;
+               _data = Arrays.copyOf(_data, _data.length * RESIZE_FACTOR);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java 
b/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java
index 3bf0ad4..7a4a013 100644
--- 
a/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java
@@ -28,6 +28,86 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
  */
 public class LinearAlgebraUtils {
 
+       public static double dotProduct(double[] a, double[] b, final int len) 
+       {
+               double val = 0;
+               final int bn = len % 8;
+
+               // compute rest
+               for (int i = 0; i < bn; i++)
+                       val += a[i] * b[i];
+
+               // unrolled 8-block (for better instruction-level parallelism)
+               for (int i = bn; i < len; i += 8) {
+                       // read 64B cachelines of a and b
+                       // compute cval' = sum(a * b) + cval
+                       val += a[i + 0] * b[i + 0] 
+                                + a[i + 1] * b[i + 1] 
+                                + a[i + 2] * b[i + 2] 
+                                + a[i + 3] * b[i + 3] 
+                                + a[i + 4] * b[i + 4]
+                                + a[i + 5] * b[i + 5] 
+                                + a[i + 6] * b[i + 6] 
+                                + a[i + 7] * b[i + 7];
+               }
+
+               // scalar result
+               return val;
+       }
+
+       public static double dotProduct( double[] a, double[] b, int ai, int 
bi, final int len )
+       {
+               double val = 0;
+               final int bn = len%8;
+                               
+               //compute rest
+               for( int i = 0; i < bn; i++, ai++, bi++ )
+                       val += a[ ai ] * b[ bi ];
+               
+               //unrolled 8-block (for better instruction-level parallelism)
+               for( int i = bn; i < len; i+=8, ai+=8, bi+=8 )
+               {
+                       //read 64B cachelines of a and b
+                       //compute cval' = sum(a * b) + cval
+                       val += a[ ai+0 ] * b[ bi+0 ]
+                            + a[ ai+1 ] * b[ bi+1 ]
+                            + a[ ai+2 ] * b[ bi+2 ]
+                            + a[ ai+3 ] * b[ bi+3 ]
+                            + a[ ai+4 ] * b[ bi+4 ]
+                            + a[ ai+5 ] * b[ bi+5 ]
+                            + a[ ai+6 ] * b[ bi+6 ]
+                            + a[ ai+7 ] * b[ bi+7 ];
+               }
+               
+               //scalar result
+               return val; 
+       }
+
+       public static void vectAdd( double[] a, double[] c, int ai, int ci, 
final int len )
+       {
+               final int bn = len%8;
+               
+               //rest, not aligned to 8-blocks
+               for( int j = 0; j < bn; j++, ai++, ci++)
+                       c[ ci ] += a[ ai ];
+               
+               //unrolled 8-block  (for better instruction-level parallelism)
+               for( int j = bn; j < len; j+=8, ai+=8, ci+=8) 
+               {
+                       //read 64B cachelines of a and c
+                       //compute c' = c * a
+                       //write back 64B cacheline of c = c'
+                       c[ ci+0 ] += a[ ai+0 ];
+                       c[ ci+1 ] += a[ ai+1 ];
+                       c[ ci+2 ] += a[ ai+2 ];
+                       c[ ci+3 ] += a[ ai+3 ];
+                       c[ ci+4 ] += a[ ai+4 ];
+                       c[ ci+5 ] += a[ ai+5 ];
+                       c[ ci+6 ] += a[ ai+6 ];
+                       c[ ci+7 ] += a[ ai+7 ];
+               }
+       }
+
        public static void vectAdd( final double aval, double[] c, char[] bix, 
final int bi, final int ci, final int len )
        {
                final int bn = len%8;
@@ -72,6 +152,53 @@ public class LinearAlgebraUtils {
                }
        }
 
+       public static void vectMultiplyAdd( final double aval, double[] b, 
double[] c, int[] bix, final int bi, final int ci, final int len )
+       {
+               final int bn = (len-bi)%8;
+               
+               //rest, not aligned to 8-blocks
+               for( int j = bi; j < bi+bn; j++ )
+                       c[ ci + bix[j] ] += aval * b[ j ];
+               
+               //unrolled 8-block (for better instruction-level parallelism)
+               for( int j = bi+bn; j < len; j+=8 )
+               {
+                       c[ ci+bix[j+0] ] += aval * b[ j+0 ];
+                       c[ ci+bix[j+1] ] += aval * b[ j+1 ];
+                       c[ ci+bix[j+2] ] += aval * b[ j+2 ];
+                       c[ ci+bix[j+3] ] += aval * b[ j+3 ];
+                       c[ ci+bix[j+4] ] += aval * b[ j+4 ];
+                       c[ ci+bix[j+5] ] += aval * b[ j+5 ];
+                       c[ ci+bix[j+6] ] += aval * b[ j+6 ];
+                       c[ ci+bix[j+7] ] += aval * b[ j+7 ];
+               }
+       }
+
+       public static void vectMultiplyAdd( final double aval, double[] b, 
double[] c, int bi, int ci, final int len )
+       {
+               final int bn = len%8;
+               
+               //rest, not aligned to 8-blocks
+               for( int j = 0; j < bn; j++, bi++, ci++)
+                       c[ ci ] += aval * b[ bi ];
+               
+               //unrolled 8-block  (for better instruction-level parallelism)
+               for( int j = bn; j < len; j+=8, bi+=8, ci+=8) 
+               {
+                       //read 64B cachelines of b and c
+                       //compute c' = aval * b + c
+                       //write back 64B cacheline of c = c'
+                       c[ ci+0 ] += aval * b[ bi+0 ];
+                       c[ ci+1 ] += aval * b[ bi+1 ];
+                       c[ ci+2 ] += aval * b[ bi+2 ];
+                       c[ ci+3 ] += aval * b[ bi+3 ];
+                       c[ ci+4 ] += aval * b[ bi+4 ];
+                       c[ ci+5 ] += aval * b[ bi+5 ];
+                       c[ ci+6 ] += aval * b[ bi+6 ];
+                       c[ ci+7 ] += aval * b[ bi+7 ];
+               }
+       }
+
        public static double vectSum( double[] a, char[] bix, final int ai, 
final int bi, final int len )
        {
                double val = 0;
@@ -122,6 +249,18 @@ public class LinearAlgebraUtils {
                return val;
        }
 
+       public static void copyUpperToLowerTriangle( MatrixBlock ret )
+       {
+               double[] c = ret.getDenseBlock();
+               final int m = ret.getNumRows();
+               final int n = ret.getNumColumns();
+               
+               //copy symmetric values
+               for( int i=0, uix=0; i<m; i++, uix+=n )
+                       for( int j=i+1, lix=j*n+i; j<n; j++, lix+=n )
+                               c[ lix ] = c[ uix+j ];
+       }
+
        public static void copyNonZerosToRowCol( MatrixBlock ret, MatrixBlock 
tmp, int ix )
        {
                for(int i=0; i<tmp.getNumColumns(); i++) {
@@ -132,4 +271,29 @@ public class LinearAlgebraUtils {
                        }
                }
        }
+       
+       /**
+        * Obtain the index of the closest element in a to the value x.
+        * 
+        * @param a array of ints
+        * @param x value
+        * @return the index of the closest element in a to the value x
+        */
+       public static int getClosestK(int[] a, int x) {
+
+               int low = 0;
+               int high = a.length - 1;
+
+               while (low < high) {
+                       int mid = (low + high) / 2;
+                       int d1 = Math.abs(a[mid] - x);
+                       int d2 = Math.abs(a[mid + 1] - x);
+                       if (d2 <= d1) {
+                               low = mid + 1;
+                       } else {
+                               high = mid;
+                       }
+               }
+               return high;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
index 2ec2f61..2d1b592 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
@@ -44,9 +44,10 @@ public class BasicCompressionTest extends AutomatedTestBase
        }
        
        public enum ValueType {
-               RAND,
-               RAND_ROUND,
-               CONST,
+               RAND, //UC
+               CONST, //RLE
+               RAND_ROUND_OLE, //OLE
+               RAND_ROUND_DDC, //RLE
        }
        
        @Override
@@ -70,13 +71,23 @@ public class BasicCompressionTest extends AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataCompression() {
-               runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, 
true);
+       public void testDenseRoundRandDataOLECompression() {
+               runCompressionTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, true);
        }
        
        @Test
-       public void testSparseRoundRandDataCompression() {
-               runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, 
true);
+       public void testSparseRoundRandDataOLECompression() {
+               runCompressionTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, true);
+       }
+       
+       @Test
+       public void testDenseRoundRandDataDDCCompression() {
+               runCompressionTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_DDC, true);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataDDCCompression() {
+               runCompressionTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_DDC, true);
        }
        
        @Test
@@ -105,13 +116,13 @@ public class BasicCompressionTest extends 
AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataNoCompression() {
-               runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, 
false);
+       public void testDenseRoundRandDataOLENoCompression() {
+               runCompressionTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, false);
        }
        
        @Test
-       public void testSparseRoundRandDataNoCompression() {
-               runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, 
false);
+       public void testSparseRoundRandDataOLENoCompression() {
+               runCompressionTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, false);
        }
        
        @Test
@@ -144,8 +155,10 @@ public class BasicCompressionTest extends AutomatedTestBase
                        //generate input data
                        double min = (vtype==ValueType.CONST)? 10 : -10;
                        double[][] input = TestUtils.generateTestMatrix(rows, 
cols, min, 10, sparsity, 7);
-                       if( vtype==ValueType.RAND_ROUND )
+                       if( vtype==ValueType.RAND_ROUND_OLE || 
vtype==ValueType.RAND_ROUND_DDC ) {
+                               CompressedMatrixBlock.ALLOW_DDC_ENCODING = 
(vtype==ValueType.RAND_ROUND_DDC);
                                input = TestUtils.round(input);
+                       }
                        MatrixBlock mb = 
DataConverter.convertToMatrixBlock(input);
                        
                        //compress given matrix block
@@ -164,5 +177,8 @@ public class BasicCompressionTest extends AutomatedTestBase
                catch(Exception ex) {
                        throw new RuntimeException(ex);
                }
+               finally {
+                       CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
index 0515acb..47c9fcc 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
@@ -46,9 +46,10 @@ public class BasicGetValueTest extends AutomatedTestBase
        }
        
        public enum ValueType {
-               RAND,
-               RAND_ROUND,
-               CONST,
+               RAND, //UC
+               CONST, //RLE
+               RAND_ROUND_OLE, //OLE
+               RAND_ROUND_DDC, //RLE
        }
        
        @Override
@@ -72,13 +73,23 @@ public class BasicGetValueTest extends AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataCompression() {
-               runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND, true);
+       public void testDenseRoundRandDataOLECompression() {
+               runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, 
true);
        }
        
        @Test
-       public void testSparseRoundRandDataCompression() {
-               runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND, 
true);
+       public void testSparseRoundRandDataOLECompression() {
+               runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, 
true);
+       }
+       
+       @Test
+       public void testDenseRoundRandDataDDCCompression() {
+               runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND_DDC, 
true);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataDDCCompression() {
+               runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, 
true);
        }
        
        @Test
@@ -107,13 +118,13 @@ public class BasicGetValueTest extends AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataNoCompression() {
-               runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND, 
false);
+       public void testDenseRoundRandDataOLENoCompression() {
+               runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, 
false);
        }
        
        @Test
-       public void testSparseRoundRandDataNoCompression() {
-               runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND, 
false);
+       public void testSparseRoundRandDataOLENoCompression() {
+               runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, 
false);
        }
        
        @Test
@@ -146,8 +157,10 @@ public class BasicGetValueTest extends AutomatedTestBase
                        //generate input data
                        double min = (vtype==ValueType.CONST)? 10 : -10;
                        double[][] input = TestUtils.generateTestMatrix(rows, 
cols, min, 10, sparsity, 7);
-                       if( vtype==ValueType.RAND_ROUND )
+                       if( vtype==ValueType.RAND_ROUND_OLE || 
vtype==ValueType.RAND_ROUND_DDC ) {
+                               CompressedMatrixBlock.ALLOW_DDC_ENCODING = 
(vtype==ValueType.RAND_ROUND_DDC);
                                input = TestUtils.round(input);
+                       }
                        MatrixBlock mb = 
DataConverter.convertToMatrixBlock(input);
                        
                        //compress given matrix block
@@ -166,5 +179,8 @@ public class BasicGetValueTest extends AutomatedTestBase
                catch(Exception ex) {
                        throw new RuntimeException(ex);
                }
+               finally {
+                       CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
index 93324b3..3bd6f0c 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
@@ -45,9 +45,10 @@ public class BasicMatrixAppendTest extends AutomatedTestBase
        }
        
        public enum ValueType {
-               RAND,
-               RAND_ROUND,
-               CONST,
+               RAND, //UC
+               CONST, //RLE
+               RAND_ROUND_OLE, //OLE
+               RAND_ROUND_DDC, //RLE
        }
        
        @Override
@@ -71,13 +72,23 @@ public class BasicMatrixAppendTest extends AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataCompression() {
-               runMatrixAppendTest(SparsityType.DENSE, ValueType.RAND_ROUND, 
true);
+       public void testDenseRoundRandDataOLECompression() {
+               runMatrixAppendTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, true);
        }
        
        @Test
-       public void testSparseRoundRandDataCompression() {
-               runMatrixAppendTest(SparsityType.SPARSE, ValueType.RAND_ROUND, 
true);
+       public void testSparseRoundRandDataOLECompression() {
+               runMatrixAppendTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, true);
+       }
+       
+       @Test
+       public void testDenseRoundRandDataDDCCompression() {
+               runMatrixAppendTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_DDC, true);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataDDCCompression() {
+               runMatrixAppendTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_DDC, true);
        }
        
        @Test
@@ -106,13 +117,13 @@ public class BasicMatrixAppendTest extends 
AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataNoCompression() {
-               runMatrixAppendTest(SparsityType.DENSE, ValueType.RAND_ROUND, 
false);
+       public void testDenseRoundRandDataOLENoCompression() {
+               runMatrixAppendTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, false);
        }
        
        @Test
-       public void testSparseRoundRandDataNoCompression() {
-               runMatrixAppendTest(SparsityType.SPARSE, ValueType.RAND_ROUND, 
false);
+       public void testSparseRoundRandDataOLENoCompression() {
+               runMatrixAppendTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, false);
        }
        
        @Test
@@ -145,8 +156,10 @@ public class BasicMatrixAppendTest extends 
AutomatedTestBase
                        //generate input data
                        double min = (vtype==ValueType.CONST)? 10 : -10;
                        double[][] input = TestUtils.generateTestMatrix(rows, 
cols1, min, 10, sparsity, 7);
-                       if( vtype==ValueType.RAND_ROUND )
+                       if( vtype==ValueType.RAND_ROUND_OLE || 
vtype==ValueType.RAND_ROUND_DDC ) {
+                               CompressedMatrixBlock.ALLOW_DDC_ENCODING = 
(vtype==ValueType.RAND_ROUND_DDC);
                                input = TestUtils.round(input);
+                       }
                        MatrixBlock mb = 
DataConverter.convertToMatrixBlock(input);
                        MatrixBlock vector = DataConverter.convertToMatrixBlock(
                                        TestUtils.generateTestMatrix(rows, 
cols2, 1, 1, 1.0, 3));
@@ -172,5 +185,8 @@ public class BasicMatrixAppendTest extends AutomatedTestBase
                catch(Exception ex) {
                        throw new RuntimeException(ex);
                }
+               finally {
+                       CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
index 8f17f91..fe46107 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
@@ -45,9 +45,10 @@ public class BasicMatrixMultChainTest extends 
AutomatedTestBase
        }
        
        public enum ValueType {
-               RAND,
-               RAND_ROUND,
-               CONST,
+               RAND, //UC
+               CONST, //RLE
+               RAND_ROUND_OLE, //OLE
+               RAND_ROUND_DDC, //RLE
        }
        
        @Override
@@ -71,13 +72,23 @@ public class BasicMatrixMultChainTest extends 
AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataNoWeightsCompression() {
-               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND, ChainType.XtXv, true);
+       public void testDenseRoundRandDataOLENoWeightsCompression() {
+               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, ChainType.XtXv, true);
        }
        
        @Test
-       public void testSparseRoundRandDataNoWeightsCompression() {
-               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND, ChainType.XtXv, true);
+       public void testSparseRoundRandDataOLENoWeightsCompression() {
+               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, ChainType.XtXv, true);
+       }
+       
+       @Test
+       public void testDenseRoundRandDataDDCNoWeightsCompression() {
+               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_DDC, ChainType.XtXv, true);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataDDCNoWeightsCompression() {
+               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_DDC, ChainType.XtXv, true);
        }
        
        @Test
@@ -106,13 +117,23 @@ public class BasicMatrixMultChainTest extends 
AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataNoWeightsNoCompression() {
-               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND, ChainType.XtXv, false);
+       public void testDenseRoundRandDataOLENoWeightsNoCompression() {
+               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, ChainType.XtXv, false);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataOLENoWeightsNoCompression() {
+               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, ChainType.XtXv, false);
+       }
+       
+       @Test
+       public void testDenseRoundRandDataDDCNoWeightsNoCompression() {
+               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_DDC, ChainType.XtXv, false);
        }
        
        @Test
-       public void testSparseRoundRandDataNoWeightsNoCompression() {
-               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND, ChainType.XtXv, false);
+       public void testSparseRoundRandDataDDCNoWeightsNoCompression() {
+               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_DDC, ChainType.XtXv, false);
        }
        
        @Test
@@ -141,13 +162,23 @@ public class BasicMatrixMultChainTest extends 
AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataWeightsCompression() {
-               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND, ChainType.XtwXv, true);
+       public void testDenseRoundRandDataOLEWeightsCompression() {
+               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, ChainType.XtwXv, true);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataOLEWeightsCompression() {
+               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, ChainType.XtwXv, true);
        }
        
        @Test
-       public void testSparseRoundRandDataWeightsCompression() {
-               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND, ChainType.XtwXv, true);
+       public void testDenseRoundRandDataDDCWeightsCompression() {
+               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_DDC, ChainType.XtwXv, true);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataDDCWeightsCompression() {
+               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_DDC, ChainType.XtwXv, true);
        }
        
        @Test
@@ -176,13 +207,13 @@ public class BasicMatrixMultChainTest extends 
AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataWeightsNoCompression() {
-               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND, ChainType.XtwXv, false);
+       public void testDenseRoundRandDataOLEWeightsNoCompression() {
+               runMatrixMultChainTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, ChainType.XtwXv, false);
        }
        
        @Test
-       public void testSparseRoundRandDataWeightsNoCompression() {
-               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND, ChainType.XtwXv, false);
+       public void testSparseRoundRandDataOLEWeightsNoCompression() {
+               runMatrixMultChainTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, ChainType.XtwXv, false);
        }
        
        @Test
@@ -214,8 +245,10 @@ public class BasicMatrixMultChainTest extends 
AutomatedTestBase
                        //generate input data
                        double min = (vtype==ValueType.CONST)? 10 : -10;
                        double[][] input = TestUtils.generateTestMatrix(rows, 
cols, min, 10, sparsity, 7);
-                       if( vtype==ValueType.RAND_ROUND )
+                       if( vtype==ValueType.RAND_ROUND_OLE || 
vtype==ValueType.RAND_ROUND_DDC ) {
+                               CompressedMatrixBlock.ALLOW_DDC_ENCODING = 
(vtype==ValueType.RAND_ROUND_DDC);
                                input = TestUtils.round(input);
+                       }
                        MatrixBlock mb = 
DataConverter.convertToMatrixBlock(input);
                        MatrixBlock vector1 = 
DataConverter.convertToMatrixBlock(
                                        TestUtils.generateTestMatrix(cols, 1, 
0, 1, 1.0, 3));
@@ -241,5 +274,8 @@ public class BasicMatrixMultChainTest extends 
AutomatedTestBase
                catch(Exception ex) {
                        throw new RuntimeException(ex);
                }
+               finally {
+                       CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
index ff2a103..c00f25f 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
@@ -45,9 +45,10 @@ public class BasicMatrixTransposeSelfMultTest extends 
AutomatedTestBase
        }
        
        public enum ValueType {
-               RAND,
-               RAND_ROUND,
-               CONST,
+               RAND, //UC
+               CONST, //RLE
+               RAND_ROUND_OLE, //OLE
+               RAND_ROUND_DDC, //RLE
        }
        
        @Override
@@ -71,13 +72,23 @@ public class BasicMatrixTransposeSelfMultTest extends 
AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataCompression() {
-               runTransposeSelfMatrixMultTest(SparsityType.DENSE, 
ValueType.RAND_ROUND, true);
+       public void testDenseRoundRandDataOLECompression() {
+               runTransposeSelfMatrixMultTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, true);
        }
        
        @Test
-       public void testSparseRoundRandDataCompression() {
-               runTransposeSelfMatrixMultTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND, true);
+       public void testSparseRoundRandDataOLECompression() {
+               runTransposeSelfMatrixMultTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, true);
+       }
+       
+       @Test
+       public void testDenseRoundRandDataDDCCompression() {
+               runTransposeSelfMatrixMultTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_DDC, true);
+       }
+       
+       @Test
+       public void testSparseRoundRandDataDDCCompression() {
+               runTransposeSelfMatrixMultTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_DDC, true);
        }
        
        @Test
@@ -106,13 +117,13 @@ public class BasicMatrixTransposeSelfMultTest extends 
AutomatedTestBase
        }
        
        @Test
-       public void testDenseRoundRandDataNoCompression() {
-               runTransposeSelfMatrixMultTest(SparsityType.DENSE, 
ValueType.RAND_ROUND, false);
+       public void testDenseRoundRandDataOLENoCompression() {
+               runTransposeSelfMatrixMultTest(SparsityType.DENSE, 
ValueType.RAND_ROUND_OLE, false);
        }
        
        @Test
-       public void testSparseRoundRandDataNoCompression() {
-               runTransposeSelfMatrixMultTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND, false);
+       public void testSparseRoundRandDataOLENoCompression() {
+               runTransposeSelfMatrixMultTest(SparsityType.SPARSE, 
ValueType.RAND_ROUND_OLE, false);
        }
        
        @Test
@@ -145,8 +156,10 @@ public class BasicMatrixTransposeSelfMultTest extends 
AutomatedTestBase
                        //generate input data
                        double min = (vtype==ValueType.CONST)? 10 : -10;
                        double[][] input = TestUtils.generateTestMatrix(rows, 
cols, min, 10, sparsity, 7);
-                       if( vtype==ValueType.RAND_ROUND )
+                       if( vtype==ValueType.RAND_ROUND_OLE || 
vtype==ValueType.RAND_ROUND_DDC ) {
+                               CompressedMatrixBlock.ALLOW_DDC_ENCODING = 
(vtype==ValueType.RAND_ROUND_DDC);
                                input = TestUtils.round(input);
+                       }
                        MatrixBlock mb = 
DataConverter.convertToMatrixBlock(input);
                        
                        //compress given matrix block
@@ -168,5 +181,8 @@ public class BasicMatrixTransposeSelfMultTest extends 
AutomatedTestBase
                catch(Exception ex) {
                        throw new RuntimeException(ex);
                }
+               finally {
+                       CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+               }
        }
 }


Reply via email to