http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
new file mode 100644
index 0000000..07d9757
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+
+public class PlanningCoCoder 
+{
+       //constants for weight computation
+       private final static float GROUPABILITY_THRESHOLD = 0.00064f;
+       private final static boolean USE_BIN_WEIGHT = false;
+       private final static float PARTITION_WEIGHT = 0.05F; //higher values 
lead to more grouping
+       private final static float PARTITION_SIZE = PARTITION_WEIGHT * 
GROUPABILITY_THRESHOLD;
+       private final static float BIN_WEIGHT_PARAM = -0.65f; //lower values 
lead to more grouping
+
+       /**
+        * 
+        * @param sizeEstimator
+        * @param availCols
+        * @param colsCardinalities
+        * @param compressedSize
+        * @param numRows
+        * @param sparsity
+        * @return
+        */
+       public static List<int[]> 
findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> 
availCols, 
+                       List<Integer> colsCardinalities,List<Long> 
compressedSize, int numRows, double sparsity) 
+       {
+               float numRowsWeight = numRows;
+               List<int[]> retGroups = new ArrayList<int[]>();
+               // filtering out non-groupable columns as singleton groups
+               int numCols = availCols.size();
+               List<Integer> groupabaleCols = new ArrayList<Integer>();
+               // weighted of each column is the ratio of its cardinality to 
the number
+               // of rows scaled by the matrix sparsity
+               List<Float> groupabaleColWeights = new ArrayList<Float>();
+               HashMap<Integer, GroupableColInfo> groupableColsInfo = new 
HashMap<Integer, GroupableColInfo>();
+               for (int i = 0; i < numCols; i++) {
+                       int colIx = availCols.get(i);
+                       int cardinality = colsCardinalities.get(i);
+                       float weight = ((float) cardinality) / numRowsWeight;
+                       if (weight <= GROUPABILITY_THRESHOLD) {
+                               groupabaleCols.add(colIx);
+                               groupabaleColWeights.add(weight);
+                               groupableColsInfo.put(colIx, new 
GroupableColInfo(weight,
+                                               compressedSize.get(i)));
+                       } else {
+                               retGroups.add(new int[] { colIx });
+                       }
+               }
+               // bin packing based on PARTITION_WEIGHT and column weights
+               float weight = computeWeightForCoCoding(numRows, sparsity);
+               TreeMap<Float, List<List<Integer>>> bins = new 
PlanningBinPacker(
+                               weight, groupabaleCols, groupabaleColWeights) 
+                               .packFirstFit();
+
+               // brute force grouping within each partition
+               for (List<List<Integer>> binList : bins.values()) {
+                       for (List<Integer> bin : binList) {
+                               // building an array of singleton CoCodingGroup
+                               PlanningCoCodingGroup[] singltonGroups = new 
PlanningCoCodingGroup[bin.size()];
+                               int i = 0;
+                               GroupableColInfo colInfo;
+                               for (Integer col : bin) {
+                                       colInfo = groupableColsInfo.get(col);
+                                       singltonGroups[i++] = new 
PlanningCoCodingGroup(col, colInfo.size,
+                                                       colInfo.cardRatio);
+                               }
+                               PlanningCoCodingGroup[] outputGroups = 
findCocodesBruteForce(
+                                               sizeEstimator, numRowsWeight, 
singltonGroups);
+                               
+                               for (PlanningCoCodingGroup grp : outputGroups) {
+                                       retGroups.add(grp.getColIndices());
+                               }
+                       }
+               }
+               return retGroups;
+       }
+
+       /**
+        * Identify columns to code together. Uses a greedy approach that merges
+        * pairs of column groups into larger groups. Each phase of the greedy
+        * algorithm considers all combinations of pairs to merge.
+        * 
+        */
+       private static PlanningCoCodingGroup[] findCocodesBruteForce(
+                       CompressedSizeEstimator sizeEstimator, float 
numRowsWeight,
+                       PlanningCoCodingGroup[] singltonGroups) 
+       {
+               // Populate a priority queue with all available 2-column 
cocodings.
+               PriorityQueue<PlanningGroupMergeAction> q = new 
PriorityQueue<PlanningGroupMergeAction>();
+               for (int leftIx = 0; leftIx < singltonGroups.length; leftIx++) {
+                       PlanningCoCodingGroup leftGrp = singltonGroups[leftIx];
+                       for (int rightIx = leftIx + 1; rightIx < 
singltonGroups.length; rightIx++) {
+                               PlanningCoCodingGroup rightGrp = 
singltonGroups[rightIx];
+                               // at least one of the two groups should be 
low-cardinality
+                               float cardRatio = leftGrp.getCardinalityRatio() 
+ rightGrp.getCardinalityRatio(); 
+                               if ( cardRatio < GROUPABILITY_THRESHOLD) {
+                                       PlanningGroupMergeAction potentialMerge 
= new PlanningGroupMergeAction(
+                                                       sizeEstimator, 
numRowsWeight, leftGrp, rightGrp);
+                                       if (potentialMerge.getChangeInSize() < 
0) {
+                                               q.add(potentialMerge);
+                                       }
+                               }
+                       }
+               }
+               PlanningCoCodingGroup[] colGroups = singltonGroups;
+               
+               // Greedily merge groups until we can no longer reduce the 
number of
+               // runs by merging groups
+               while (q.size() > 0) {
+                       PlanningGroupMergeAction merge = q.poll();
+
+                       // The queue can contain merge actions involving column 
groups that
+                       // have already been merged.
+                       // Filter those actions out.
+                       int leftIx = findInArray(colGroups, merge.getLeftGrp());
+                       int rightIx = findInArray(colGroups, 
merge.getRightGrp());
+                       if (leftIx < 0 || rightIx < 0) {
+                               // One or more of the groups to be merged has 
already been made
+                               // part of another group.
+                               // Drop the merge action.
+                       } else {
+                               PlanningCoCodingGroup mergedGrp = 
merge.getMergedGrp();
+
+                               PlanningCoCodingGroup[] newColGroups = new 
PlanningCoCodingGroup[colGroups.length - 1];
+                               int targetIx = 0;
+                               for (int i = 0; i < colGroups.length; i++) {
+                                       if (i != leftIx && i != rightIx) {
+                                               newColGroups[targetIx] = 
colGroups[i];
+                                               targetIx++;
+                                       }
+                               }
+
+                               // New group goes at the end to (hopefully) 
speed up future
+                               // linear search operations
+                               newColGroups[newColGroups.length - 1] = 
mergedGrp;
+
+                               // Consider merging the new group with all the 
other
+                               // pre-existing groups.
+                               for (int i = 0; i < newColGroups.length - 1; 
i++) {
+                                       PlanningCoCodingGroup newLeftGrp = 
newColGroups[i];
+                                       PlanningCoCodingGroup newRightGrp = 
mergedGrp;
+                                       if (newLeftGrp.getCardinalityRatio()
+                                                       + 
newRightGrp.getCardinalityRatio() < GROUPABILITY_THRESHOLD) {
+                                               PlanningGroupMergeAction 
newPotentialMerge = new PlanningGroupMergeAction(
+                                                               sizeEstimator, 
numRowsWeight, newLeftGrp,
+                                                               newRightGrp);
+                                               if 
(newPotentialMerge.getChangeInSize() < 0) {
+                                                       
q.add(newPotentialMerge);
+                                               }
+                                       }
+                               }
+                               colGroups = newColGroups;
+                       }
+               }
+               return colGroups;
+       }
+
+       /**
+        * 
+        * @param numRows
+        * @param sparsity
+        * @return
+        */
+       private static float computeWeightForCoCoding(int numRows, double 
sparsity)
+       {
+               if( USE_BIN_WEIGHT ) { //new method (non-conclusive)
+                       //return (float) 
Math.pow(numRows*sparsity,BIN_WEIGHT_PARAM);
+                       return (float) Math.pow(numRows,BIN_WEIGHT_PARAM);
+               }
+               else {
+                       return PARTITION_SIZE;
+               }
+       }
+       
+       /**
+        * 
+        * @param arr
+        * @param val
+        * @return
+        */
+       private static int findInArray(Object[] arr, Object val) {
+               for (int i = 0; i < arr.length; i++) {
+                       if (arr[i].equals(val)) {
+                               return i;
+                       }
+               }
+               return -1;
+       }
+       
+       /**
+        * 
+        */
+       private static class GroupableColInfo {
+               float cardRatio;
+               long size;
+
+               public GroupableColInfo(float lcardRatio, long lsize) {
+                       cardRatio = lcardRatio;
+                       size = lsize;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
new file mode 100644
index 0000000..221e4ca
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+
+/** 
+ * Class to represent information about co-coding a group of columns. 
+ * 
+ */
+public class PlanningCoCodingGroup 
+{
+       private int[] _colIndexes;
+       private long _estSize;
+       private float _cardRatio;
+
+       /**
+        * Constructor for a one-column group; i.e. do not co-code a given 
column.
+        * 
+        */
+       public PlanningCoCodingGroup(int col, long estSize, float cardRatio) {
+               _colIndexes = new int[]{col};
+               _estSize = estSize;
+               _cardRatio = cardRatio;
+       }
+
+       /**
+        * Constructor for merging two disjoint groups of columns
+        * 
+        * @param grp1   first group of columns to merge
+        * @param grp2   second group to merge
+        * @param numRowsWeight numRows x sparsity
+        */
+       public PlanningCoCodingGroup(PlanningCoCodingGroup grp1, 
PlanningCoCodingGroup grp2,
+                       CompressedSizeEstimator bitmapSizeEstimator, float 
numRowsWeight) 
+       {
+               // merge sorted non-empty arrays
+               _colIndexes = new int[grp1._colIndexes.length + 
grp2._colIndexes.length];               
+               int grp1Ptr = 0, grp2Ptr = 0;
+               for (int mergedIx = 0; mergedIx < _colIndexes.length; 
mergedIx++) {
+                       if (grp1._colIndexes[grp1Ptr] < 
grp2._colIndexes[grp2Ptr]) {
+                               _colIndexes[mergedIx] = 
grp1._colIndexes[grp1Ptr++];
+                               if (grp1Ptr == grp1._colIndexes.length) {
+                                       System.arraycopy(grp2._colIndexes, 
grp2Ptr, _colIndexes,
+                                                       mergedIx + 1, 
grp2._colIndexes.length - grp2Ptr);
+                                       break;
+                               }
+                       } else {
+                               _colIndexes[mergedIx] = 
grp2._colIndexes[grp2Ptr++];
+                               if (grp2Ptr == grp2._colIndexes.length) {
+                                       System.arraycopy(grp1._colIndexes, 
grp1Ptr, _colIndexes,
+                                                       mergedIx + 1, 
grp1._colIndexes.length - grp1Ptr);
+                                       break;
+                               }
+                       }
+               }
+               
+               // estimating size info
+               CompressedSizeInfo groupSizeInfo = bitmapSizeEstimator
+                               .estimateCompressedColGroupSize(_colIndexes);
+               _estSize = groupSizeInfo.getMinSize();
+               _cardRatio = groupSizeInfo.getEstCarinality() / numRowsWeight;
+       }
+
+       public int[] getColIndices() {
+               return _colIndexes;
+       }
+
+       /**
+        * @return estimated compressed size of the grouped columns
+        */
+       public long getEstSize() {
+               return _estSize;
+       }
+
+       public float getCardinalityRatio() {
+               return _cardRatio;
+       }
+
+       @Override
+       public String toString() {
+               return Arrays.toString(_colIndexes);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
new file mode 100644
index 0000000..5e3c6c5
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+
+/**
+ * Internal data structure for tracking potential merges of column groups in
+ * co-coding calculations.
+ * 
+ */
+class PlanningGroupMergeAction implements Comparable<PlanningGroupMergeAction> 
+{
+       private PlanningCoCodingGroup _leftGrp;   //left input
+       private PlanningCoCodingGroup _rightGrp;  //right input
+       private PlanningCoCodingGroup _mergedGrp; //output
+       private long _changeInSize;
+
+       
+       public PlanningGroupMergeAction(CompressedSizeEstimator sizeEstimator,
+                       float numRowsWeight, PlanningCoCodingGroup leftGrp, 
PlanningCoCodingGroup rightGrp) {
+               _leftGrp = leftGrp;
+               _rightGrp = rightGrp;
+               _mergedGrp = new PlanningCoCodingGroup(leftGrp, rightGrp, 
sizeEstimator, numRowsWeight);
+
+               // Negative size change ==> Decrease in size
+               _changeInSize = _mergedGrp.getEstSize() 
+                               - leftGrp.getEstSize() - rightGrp.getEstSize();
+       }
+
+       public int compareTo(PlanningGroupMergeAction o) {
+               // We only sort by the change in size
+               return (int) Math.signum(_changeInSize - o._changeInSize);
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Merge %s and %s", _leftGrp, _rightGrp);
+       }
+
+       public PlanningCoCodingGroup getLeftGrp() {
+               return _leftGrp;
+       }
+
+       public PlanningCoCodingGroup getRightGrp() {
+               return _rightGrp;
+       }
+
+       public PlanningCoCodingGroup getMergedGrp() {
+               return _mergedGrp;
+       }
+
+       public long getChangeInSize() {
+               return _changeInSize;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelection.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelection.java 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelection.java
new file mode 100644
index 0000000..a37018f
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelection.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import org.apache.sysml.runtime.compress.utils.DblArray;
+
+/**
+ * Base class for all column selection readers.
+ * 
+ */
+public abstract class ReaderColumnSelection 
+{
+       protected int[] _colIndexes = null;
+       protected int _numRows = -1;
+       protected int _lastRow = -1;
+       protected boolean _skipZeros = false;
+       
+       protected ReaderColumnSelection(int[] colIndexes, int numRows, boolean 
skipZeros) {
+               _colIndexes = colIndexes;
+               _numRows = numRows;
+               _lastRow = -1;
+               _skipZeros = skipZeros;
+       }
+       
+       /**
+        * Gets the next row, null when no more rows.
+        * 
+        * @return
+        */
+       public abstract DblArray nextRow();
+
+       /**
+        * 
+        * @return
+        */
+       public int getCurrentRowIndex() {
+               return _lastRow;
+       }
+       
+
+       /**
+        * Resets the reader to the first row.
+        */
+       public void reset() {
+               _lastRow = -1;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionDense.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionDense.java
 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionDense.java
new file mode 100644
index 0000000..d22f39d
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionDense.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import org.apache.sysml.runtime.compress.utils.DblArray;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+public class ReaderColumnSelectionDense extends ReaderColumnSelection 
+{
+       protected MatrixBlock _data;
+       
+       // reusable return
+       private DblArray nonZeroReturn;
+       private DblArray reusableReturn;
+       private double[] reusableArr;
+
+       public ReaderColumnSelectionDense(MatrixBlock data, int[] colIndices, 
boolean skipZeros) {
+               super(colIndices, CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+                               data.getNumColumns() : data.getNumRows(), 
skipZeros);
+               _data = data;
+               reusableArr = new double[colIndices.length];
+               reusableReturn = new DblArray(reusableArr);
+       }
+
+       @Override
+       public DblArray nextRow() {
+               if( _skipZeros) {
+                       while ((nonZeroReturn = getNextRow()) != null
+                               && DblArray.isZero(nonZeroReturn)); 
+                       return nonZeroReturn;
+               } else {
+                       return getNextRow();
+               }
+       }
+
+       /**
+        * 
+        * @return
+        */
+       private DblArray getNextRow() {
+               if(_lastRow == _numRows-1)
+                       return null;
+               _lastRow++;
+               for (int i = 0; i < _colIndexes.length; i++) {
+                       reusableArr[i] = CompressedMatrixBlock.TRANSPOSE_INPUT 
? 
+                                       _data.quickGetValue( _colIndexes[i], 
_lastRow ) : 
+                                       _data.quickGetValue( _lastRow, 
_colIndexes[i] );
+               }
+               return reusableReturn;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionDenseSample.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionDenseSample.java
 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionDenseSample.java
new file mode 100644
index 0000000..06518e4
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionDenseSample.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import org.apache.sysml.runtime.compress.utils.DblArray;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * 
+ * considers only a subset of row indexes
+ */
+public class ReaderColumnSelectionDenseSample extends ReaderColumnSelection 
+{
+       protected MatrixBlock _data;
+       
+       private int[] _sampleIndexes;
+       private int lastIndex = -1;
+
+       // reusable return
+       private DblArray nonZeroReturn;
+       private DblArray reusableReturn;
+       private double[] reusableArr;
+
+       public ReaderColumnSelectionDenseSample(MatrixBlock data, int[] 
colIndexes, int[] sampleIndexes, boolean skipZeros) 
+       {
+               super(colIndexes, -1, skipZeros);
+               _data = data;
+               _sampleIndexes = sampleIndexes;
+               reusableArr = new double[colIndexes.length];
+               reusableReturn = new DblArray(reusableArr);
+       }
+
+       @Override
+       public DblArray nextRow() {
+               if (_skipZeros) {
+                       while ((nonZeroReturn = getNextRow()) != null
+                                       && DblArray.isZero(nonZeroReturn));
+                       return nonZeroReturn;
+               } else {
+                       return getNextRow();
+               }
+       }
+
+       /**
+        * 
+        * @return
+        */
+       private DblArray getNextRow() {
+               if (lastIndex == _sampleIndexes.length - 1)
+                       return null;
+               lastIndex++;
+               for (int i = 0; i < _colIndexes.length; i++) {
+                       reusableArr[i] = CompressedMatrixBlock.TRANSPOSE_INPUT 
? 
+                                       _data.quickGetValue(_colIndexes[i], 
_sampleIndexes[lastIndex]) :
+                                       
_data.quickGetValue(_sampleIndexes[lastIndex], _colIndexes[i]);
+               }
+               return reusableReturn;
+       }
+
+       @Override
+       public int getCurrentRowIndex() {
+               return _sampleIndexes[lastIndex];
+       }
+       
+       @Override
+       public void reset() {
+               lastIndex = -1;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
new file mode 100644
index 0000000..d2ef5a4
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.compress.utils.DblArray;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseRow;
+
+/**
+ * Used to extract the values at certain indexes from each row in a sparse
+ * matrix
+ * 
+ * Keeps returning all-zeros arrays until reaching the last possible index. The
+ * current compression algorithm treats the zero-value in a sparse matrix like
+ * any other value.
+ */
+public class ReaderColumnSelectionSparse extends ReaderColumnSelection 
+{
+       private final DblArray ZERO_DBL_ARRAY;
+       private DblArray nonZeroReturn;
+
+       // reusable return
+       private DblArray reusableReturn;
+       private double[] reusableArr;
+
+       // current sparse row positions
+       private SparseRow[] sparseCols = null;
+       private int[] sparsePos = null;
+       
+       public ReaderColumnSelectionSparse(MatrixBlock data, int[] colIndexes, 
boolean skipZeros) 
+       {
+               super(colIndexes, CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+                               data.getNumColumns() : data.getNumRows(), 
skipZeros);
+               ZERO_DBL_ARRAY = new DblArray(new double[colIndexes.length], 
true);
+               reusableArr = new double[colIndexes.length];
+               reusableReturn = new DblArray(reusableArr);
+               
+               if( !CompressedMatrixBlock.TRANSPOSE_INPUT ){
+                       throw new RuntimeException("SparseColumnSelectionReader 
should not be used without transposed input.");
+               }
+               
+               sparseCols = new SparseRow[colIndexes.length];
+               sparsePos = new int[colIndexes.length];
+               if( data.getSparseBlock()!=null )
+               for( int i=0; i<colIndexes.length; i++ )
+                       sparseCols[i] = 
data.getSparseBlock().get(colIndexes[i]);
+               Arrays.fill(sparsePos, 0);
+       }
+
+       @Override
+       public DblArray nextRow() {
+               if(_skipZeros) {
+                       while ((nonZeroReturn = getNextRow()) != null
+                               && nonZeroReturn == ZERO_DBL_ARRAY);
+                       return nonZeroReturn;
+               } else {
+                       return getNextRow();
+               }
+       }
+
+       /**
+        * 
+        * @return
+        */
+       private DblArray getNextRow() 
+       {
+               if(_lastRow == _numRows-1)
+                       return null;
+               _lastRow++;
+               
+               if( !CompressedMatrixBlock.TRANSPOSE_INPUT ){
+                       throw new RuntimeException("SparseColumnSelectionReader 
should not be used without transposed input.");
+               }
+               
+               //move pos to current row if necessary (for all columns)
+               for( int i=0; i<_colIndexes.length; i++ )
+                       if( sparseCols[i] != null && 
(sparseCols[i].indexes().length<=sparsePos[i] 
+                               || 
sparseCols[i].indexes()[sparsePos[i]]<_lastRow) )
+                       {
+                               sparsePos[i]++;
+                       }
+               
+               //extract current values
+               Arrays.fill(reusableArr, 0);
+               boolean zeroResult = true;
+               for( int i=0; i<_colIndexes.length; i++ ) 
+                       if( sparseCols[i] != null && 
sparseCols[i].indexes().length>sparsePos[i]
+                               
&&sparseCols[i].indexes()[sparsePos[i]]==_lastRow )
+                       {
+                               reusableArr[i] = 
sparseCols[i].values()[sparsePos[i]];
+                               zeroResult = false;
+                       }
+
+               return zeroResult ? ZERO_DBL_ARRAY : reusableReturn;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java 
b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
new file mode 100644
index 0000000..971f438
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap;
+import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap;
+import 
org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap.DArrayIListEntry;
+import 
org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry;
+
+/** 
+ * Uncompressed representation of one or more columns in bitmap format. 
+ * 
+ */
+public final class UncompressedBitmap 
+{
+       private int _numCols;
+
+       /** Distinct values that appear in the column. Linearized as value 
groups <v11 v12> <v21 v22>.*/
+       private double[] _values;
+
+       /** Bitmaps (as lists of offsets) for each of the values. */
+       private int[][] _offsetsLists;
+
+       public UncompressedBitmap( DblArrayIntListHashMap distinctVals, int 
numColumns ) 
+       {
+               // added for one pass bitmap construction
+               // Convert inputs to arrays
+               int numVals = distinctVals.size();
+               _values = new double[numVals*numColumns];
+               _offsetsLists = new int[numVals][];
+               int bitmapIx = 0;
+               for( DArrayIListEntry val : distinctVals.extractValues()) {
+                       System.arraycopy(val.key.getData(), 0, _values, 
bitmapIx*numColumns, numColumns);
+                       _offsetsLists[bitmapIx++] = val.value.extractValues();
+               }
+               _numCols = numColumns;
+       }
+
+       public UncompressedBitmap( DoubleIntListHashMap distinctVals ) 
+       {
+               // added for one pass bitmap construction
+               // Convert inputs to arrays
+               int numVals = distinctVals.size();
+               _values = new double[numVals];
+               _offsetsLists = new int[numVals][];
+               int bitmapIx = 0;
+               for(DIListEntry val : distinctVals.extractValues()) {
+                       _values[bitmapIx] = val.key;
+                       _offsetsLists[bitmapIx++] = val.value.extractValues();
+               }
+               _numCols = 1;
+       }
+       
+       public int getNumColumns() {
+               return _numCols;
+       }
+
+       /**
+        * @param ix   index of a particular distinct value
+        * @return the tuple of column values associated with the specified 
index
+        */
+       public double[] getValues(int ix) {
+               return Arrays.copyOfRange(_values, ix*_numCols, 
(ix+1)*_numCols);
+       }
+
+       /**
+        * @return number of distinct values in the column; this number is also 
the
+        *         number of bitmaps, since there is one bitmap per value
+        */
+       public int getNumValues() {
+               return _values.length / _numCols;
+       }
+
+       /**
+        * @param ix   index of a particular distinct value
+        * @return IMMUTABLE array of the offsets of the rows containing the 
value
+        *         with the indicated index
+        */
+       public int[] getOffsetsList(int ix) {
+               return _offsetsLists[ix];
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
new file mode 100644
index 0000000..1a1ae55
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.estim;
+
+import org.apache.sysml.runtime.compress.BitmapEncoder;
+import org.apache.sysml.runtime.compress.UncompressedBitmap;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * Base class for all compressed size estimators
+ */
+public abstract class CompressedSizeEstimator 
+{
+       protected MatrixBlock _data;
+
+       public CompressedSizeEstimator(MatrixBlock data) {
+               _data = data;
+       }
+
+       /**
+        * 
+        * @param colIndexes
+        * @return
+        */
+       public abstract CompressedSizeInfo estimateCompressedColGroupSize(int[] 
colIndexes);
+
+       /**
+        * 
+        * @param ubm
+        * @return
+        */
+       public abstract CompressedSizeInfo 
estimateCompressedColGroupSize(UncompressedBitmap ubm);
+
+       /**
+        * 
+        * @param ubm
+        * @param inclRLE
+        * @return
+        */
+       protected SizeEstimationFactors 
computeSizeEstimationFactors(UncompressedBitmap ubm, boolean inclRLE) {
+               int numVals = ubm.getNumValues();
+               int numRuns = 0;
+               int numOffs = 0;
+               int numSegs = 0;
+               int numSingle = 0;
+               
+               //compute size estimation factors
+               for (int i = 0; i < numVals; i++) {
+                       int[] list = ubm.getOffsetsList(i);
+                       numOffs += list.length;
+                       numSegs += list[list.length - 1] / 
BitmapEncoder.BITMAP_BLOCK_SZ + 1;
+                       numSingle += (list.length==1) ? 1 : 0;
+                       if( inclRLE ) {
+                               int lastOff = -2;
+                               for (int j = 0; j < list.length; j++) {
+                                       if (list[j] != lastOff + 1)
+                                               numRuns++;
+                                       lastOff = list[j];
+                               }
+                       }
+               }
+               
+               //construct estimation factors
+               return new SizeEstimationFactors(numVals, numSegs, numOffs, 
numRuns, numSingle);
+       }
+
+       /**
+        * Estimates the number of bytes needed to encode this column group
+        * in RLE encoding format.
+        * 
+        * @param numVals
+        * @param numRuns
+        * @param numCols
+        * @return
+        */
+       protected static long getRLESize(int numVals, int numRuns, int numCols) 
{
+               int ret = 0;
+               //distinct value tuples [double per col]
+               ret += 8 * numVals * numCols;
+               //offset/len fields per distinct value tuple [2xint]
+               ret += 8 * numVals;
+               //run data [2xchar]
+               ret += 4 * numRuns;
+               return ret;
+       }
+
+       /**
+        * Estimates the number of bytes needed to encode this column group 
+        * in OLE format.
+        * 
+        * @param numVals
+        * @param numOffs
+        * @param numSeqs
+        * @param numCols
+        * @return
+        */
+       protected static long getOLESize(int numVals, float numOffs, int 
numSeqs, int numCols) {
+               int ret = 0;
+               //distinct value tuples [double per col]
+               ret += 8 * numVals * numCols;
+               //offset/len fields per distinct value tuple [2xint]
+               ret += 8 * numVals;
+               //offset list data [1xchar]
+               ret += 2 * numOffs;
+               //offset list seqment headers [1xchar]
+               ret += 2 * numSeqs;
+               return ret;
+       }
+       
+       /**
+        * 
+        */
+       protected static class SizeEstimationFactors {
+               protected int numVals;   //num value tuples
+               protected int numSegs;   //num OLE segments 
+               protected int numOffs;   //num OLE offsets
+               protected int numRuns;   //num RLE runs
+               protected int numSingle; //num singletons
+               
+               protected SizeEstimationFactors(int numvals, int numsegs, int 
numoffs, int numruns, int numsingle) {
+                       numVals = numvals;
+                       numSegs = numsegs;
+                       numOffs = numoffs;
+                       numRuns = numruns;
+                       numSingle = numsingle;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
new file mode 100644
index 0000000..557c518
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.estim;
+
+import org.apache.sysml.runtime.compress.BitmapEncoder;
+import org.apache.sysml.runtime.compress.UncompressedBitmap;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * Exact compressed size estimator (examines entire dataset).
+ * 
+ */
+public class CompressedSizeEstimatorExact extends CompressedSizeEstimator 
+{
+       public CompressedSizeEstimatorExact(MatrixBlock data) {
+               super(data);
+       }
+
+       @Override
+       public CompressedSizeInfo estimateCompressedColGroupSize(int[] 
colIndexes) {
+               return estimateCompressedColGroupSize(
+                       BitmapEncoder.extractBitmap(colIndexes, _data));
+       }
+
+       @Override
+       public CompressedSizeInfo 
estimateCompressedColGroupSize(UncompressedBitmap ubm) 
+       {
+               //compute size estimation factors
+               SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, 
true);
+               
+               //construct new size info summary
+               return new CompressedSizeInfo(fact.numVals,
+                               getRLESize(fact.numVals, fact.numRuns, 
ubm.getNumColumns()),
+                               getOLESize(fact.numVals, fact.numOffs, 
fact.numSegs, ubm.getNumColumns()));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
new file mode 100644
index 0000000..76a0f06
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -0,0 +1,767 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.estim;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.commons.math3.distribution.ChiSquaredDistribution;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.compress.BitmapEncoder;
+import org.apache.sysml.runtime.compress.ReaderColumnSelection;
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.compress.ReaderColumnSelectionDense;
+import org.apache.sysml.runtime.compress.ReaderColumnSelectionDenseSample;
+import org.apache.sysml.runtime.compress.ReaderColumnSelectionSparse;
+import org.apache.sysml.runtime.compress.UncompressedBitmap;
+import org.apache.sysml.runtime.compress.utils.DblArray;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+public class CompressedSizeEstimatorSample extends CompressedSizeEstimator 
+{
+       private static final boolean CORRECT_NONZERO_ESTIMATE = false; //TODO 
enable for production
+       private final static double SHLOSSER_JACKKNIFE_ALPHA = 0.975;
+       public static final float HAAS_AND_STOKES_ALPHA1 = 0.9F; //0.9 
recommended in paper
+       public static final float HAAS_AND_STOKES_ALPHA2 = 30F; //30 
recommended in paper
+       public static final float HAAS_AND_STOKES_UJ2A_C = 50; //50 recommend 
in paper
+
+       private int[] _sampleRows = null;
+       private RandomDataGenerator _rng = null;
+       private int _numRows = -1;
+       
+       /**
+        * 
+        * @param data
+        * @param sampleRows
+        */
+       public CompressedSizeEstimatorSample(MatrixBlock data, int[] 
sampleRows) {
+               super(data);
+               _sampleRows = sampleRows;
+               _rng = new RandomDataGenerator();
+               _numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+                               _data.getNumColumns() : _data.getNumRows();
+       }
+
+       /**
+        * 
+        * @param mb
+        * @param sampleSize
+        */
+       public CompressedSizeEstimatorSample(MatrixBlock mb, int sampleSize) {
+               this(mb, null);
+               _sampleRows = getSortedUniformSample(_numRows, sampleSize);
+       }
+
+       /**
+        * 
+        * @param sampleRows, assumed to be sorted
+        */
+       public void setSampleRows(int[] sampleRows) {
+               _sampleRows = sampleRows;
+       }
+
+       /**
+        * 
+        * @param sampleSize
+        */
+       public void resampleRows(int sampleSize) {
+               _sampleRows = getSortedUniformSample(_numRows, sampleSize);
+       }
+
+       @Override
+       public CompressedSizeInfo estimateCompressedColGroupSize(int[] 
colIndexes) 
+       {
+               //extract statistics from sample
+               UncompressedBitmap ubm = BitmapEncoder.extractBitmapFromSample(
+                               colIndexes, _data, _sampleRows);
+               SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, 
false);
+
+               //estimate number of distinct values 
+               int totalCardinality = getNumDistinctValues(colIndexes);
+               totalCardinality = Math.max(totalCardinality, fact.numVals); 
//fix anomalies w/ large sample fraction
+               totalCardinality = Math.min(totalCardinality, _numRows); //fix 
anomalies w/ large sample fraction
+               
+               //estimate unseen values
+               // each unseen is assumed to occur only once (it did not show 
up in the sample because it is rare)
+               int unseen = Math.max(0, totalCardinality - fact.numVals);
+               int sampleSize = _sampleRows.length;
+               
+               //estimate number of offsets
+               double sparsity = OptimizerUtils.getSparsity(
+                               _data.getNumRows(), _data.getNumColumns(), 
_data.getNonZeros());
+               
+               // expected value given that we don't store the zero values
+               float totalNumOffs = (float) (_numRows * (1 - Math.pow(1 - 
sparsity,colIndexes.length)));               
+               if( CORRECT_NONZERO_ESTIMATE ) {
+                       long numZeros = sampleSize - fact.numOffs;
+                       float C = Math.max(1-(float)fact.numSingle/sampleSize, 
(float)sampleSize/_numRows); 
+                       totalNumOffs = _numRows - ((numZeros>0)? 
(float)_numRows/sampleSize*C*numZeros : 0);
+               }
+               
+               // For a single offset, the number of blocks depends on the 
value of
+               // that offset. small offsets (first group of rows in the 
matrix)
+               // require a small number of blocks and large offsets (last 
group of
+               // rows) require a large number of blocks. The unseen offsets 
are
+               // distributed over the entire offset range. A reasonable and 
fast
+               // estimate for the number of blocks is to use the arithmetic 
mean of
+               // the number of blocks used for the first index (=1) and that 
of the
+               // last index.
+               int numUnseenSeg = Math.round(unseen
+                               * (2.0f * BitmapEncoder.BITMAP_BLOCK_SZ + 
_numRows) / 2
+                               / BitmapEncoder.BITMAP_BLOCK_SZ);
+               int totalNumSeg = fact.numSegs + numUnseenSeg;
+               int totalNumRuns = getNumRuns(ubm, sampleSize, _numRows) + 
unseen;
+
+               //construct new size info summary
+               return new CompressedSizeInfo(totalCardinality,
+                               getRLESize(totalCardinality, totalNumRuns, 
colIndexes.length),
+                               getOLESize(totalCardinality, totalNumOffs, 
totalNumSeg, colIndexes.length));
+       }
+
+       @Override
+       public CompressedSizeInfo 
estimateCompressedColGroupSize(UncompressedBitmap ubm) 
+       {
+               //compute size estimation factors
+               SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, 
true);
+               
+               //construct new size info summary
+               return new CompressedSizeInfo(fact.numVals,
+                               getRLESize(fact.numVals, fact.numRuns, 
ubm.getNumColumns()),
+                               getOLESize(fact.numVals, fact.numOffs, 
fact.numSegs, ubm.getNumColumns()));
+       }
+       
+       /**
+        * 
+        * @param colIndexes
+        * @return
+        */
+       private int getNumDistinctValues(int[] colIndexes) {
+               return haasAndStokes(colIndexes);
+       }
+
+       /**
+        * 
+        * @param sampleUncompressedBitmap
+        * @param sampleSize
+        * @param totalNumRows
+        * @return
+        */
+       private int getNumRuns(UncompressedBitmap sampleUncompressedBitmap,
+                       int sampleSize, int totalNumRows) {
+               int numVals = sampleUncompressedBitmap.getNumValues();
+               // all values in the sample are zeros
+               if (numVals == 0)
+                       return 0;
+               float numRuns = 0;
+               for (int vi = 0; vi < numVals; vi++) {
+                       int[] offsets = 
sampleUncompressedBitmap.getOffsetsList(vi);
+                       float offsetsRatio = ((float) offsets.length) / 
sampleSize;
+                       float avgAdditionalOffsets = offsetsRatio * totalNumRows
+                                       / sampleSize;
+                       if (avgAdditionalOffsets < 1) {
+                               // Ising-Stevens does not hold
+                               // fall-back to using the expected number of 
offsets as an upper
+                               // bound on the number of runs
+                               numRuns += ((float) offsets.length) * 
totalNumRows / sampleSize;
+                               continue;
+                       }
+                       int intervalEnd, intervalSize;
+                       float additionalOffsets;
+                       // probability of an index being non-offset in current 
and previous
+                       // interval respectively
+                       float nonOffsetProb, prevNonOffsetProb = 1;
+                       boolean reachedSampleEnd = false;
+                       // handling the first interval separately for simplicity
+                       int intervalStart = -1;
+                       if (_sampleRows[0] == 0) {
+                               // empty interval
+                               intervalStart = 0;
+                       } else {
+                               intervalEnd = _sampleRows[0];
+                               intervalSize = intervalEnd - intervalStart - 1;
+                               // expected value of a multivariate 
hypergeometric distribution
+                               additionalOffsets = offsetsRatio * intervalSize;
+                               // expected value of an Ising-Stevens 
distribution
+                               numRuns += (intervalSize - additionalOffsets)
+                                               * additionalOffsets / 
intervalSize;
+                               intervalStart = intervalEnd;
+                               prevNonOffsetProb = (intervalSize - 
additionalOffsets)
+                                               / intervalSize;
+                       }
+                       // for handling separators
+
+                       int withinSepRun = 0;
+                       boolean seenNonOffset = false, startedWithOffset = 
false, endedWithOffset = false;
+                       int offsetsPtrs = 0;
+                       for (int ix = 1; ix < sampleSize; ix++) {
+                               // start of a new separator
+                               // intervalStart will always be pointing at the 
current value
+                               // in the separator block
+
+                               if (offsetsPtrs < offsets.length
+                                               && offsets[offsetsPtrs] == 
intervalStart) {
+                                       startedWithOffset = true;
+                                       offsetsPtrs++;
+                                       endedWithOffset = true;
+                               } else {
+                                       seenNonOffset = true;
+                                       endedWithOffset = false;
+                               }
+                               while (intervalStart + 1 == _sampleRows[ix]) {
+                                       intervalStart = _sampleRows[ix];
+                                       if (seenNonOffset) {
+                                               if (offsetsPtrs < offsets.length
+                                                               && 
offsets[offsetsPtrs] == intervalStart) {
+                                                       withinSepRun = 1;
+                                                       offsetsPtrs++;
+                                                       endedWithOffset = true;
+                                               } else {
+                                                       numRuns += withinSepRun;
+                                                       withinSepRun = 0;
+                                                       endedWithOffset = false;
+                                               }
+                                       } else if (offsetsPtrs < offsets.length
+                                                       && offsets[offsetsPtrs] 
== intervalStart) {
+                                               offsetsPtrs++;
+                                               endedWithOffset = true;
+                                       } else {
+                                               seenNonOffset = true;
+                                               endedWithOffset = false;
+                                       }
+                                       //
+                                       ix++;
+                                       if (ix == sampleSize) {
+                                               // end of sample which 
searching for a start
+                                               reachedSampleEnd = true;
+                                               break;
+                                       }
+                               }
+
+                               // runs within an interval of unknowns
+                               if (reachedSampleEnd)
+                                       break;
+                               intervalEnd = _sampleRows[ix];
+                               intervalSize = intervalEnd - intervalStart - 1;
+                               // expected value of a multivariate 
hypergeometric distribution
+                               additionalOffsets = offsetsRatio * intervalSize;
+                               // expected value of an Ising-Stevens 
distribution
+                               numRuns += (intervalSize - additionalOffsets)
+                                               * additionalOffsets / 
intervalSize;
+                               nonOffsetProb = (intervalSize - 
additionalOffsets)
+                                               / intervalSize;
+
+                               // additional runs resulting from x's on the 
boundaries of the
+                               // separators
+                               // endedWithOffset = findInArray(offsets, 
intervalStart) != -1;
+                               if (seenNonOffset) {
+                                       if (startedWithOffset) {
+                                               // add p(y in the previous 
interval)
+                                               numRuns += prevNonOffsetProb;
+                                       }
+                                       if (endedWithOffset) {
+                                               // add p(y in the current 
interval)
+                                               numRuns += nonOffsetProb;
+                                       }
+                               } else {
+                                       // add p(y in the previous interval and 
y in the current
+                                       // interval)
+                                       numRuns += prevNonOffsetProb * 
nonOffsetProb;
+                               }
+                               prevNonOffsetProb = nonOffsetProb;
+                               intervalStart = intervalEnd;
+                               // reseting separator variables
+                               seenNonOffset = startedWithOffset = 
endedWithOffset = false;
+                               withinSepRun = 0;
+
+                       }
+                       // last possible interval
+                       if (intervalStart != totalNumRows - 1) {
+                               intervalEnd = totalNumRows;
+                               intervalSize = intervalEnd - intervalStart - 1;
+                               // expected value of a multivariate 
hypergeometric distribution
+                               additionalOffsets = offsetsRatio * intervalSize;
+                               // expected value of an Ising-Stevens 
distribution
+                               numRuns += (intervalSize - additionalOffsets)
+                                               * additionalOffsets / 
intervalSize;
+                               nonOffsetProb = (intervalSize - 
additionalOffsets)
+                                               / intervalSize;
+                       } else {
+                               nonOffsetProb = 1;
+                       }
+                       // additional runs resulting from x's on the boundaries 
of the
+                       // separators
+                       endedWithOffset = intervalStart == 
offsets[offsets.length - 1];
+                       if (seenNonOffset) {
+                               if (startedWithOffset) {
+                                       numRuns += prevNonOffsetProb;
+                               }
+                               if (endedWithOffset) {
+                                       // add p(y in the current interval)
+                                       numRuns += nonOffsetProb;
+                               }
+                       } else {
+                               if (endedWithOffset)
+                                       // add p(y in the previous interval and 
y in the current
+                                       // interval)
+                                       numRuns += prevNonOffsetProb * 
nonOffsetProb;
+                       }
+               }
+               return Math.round(numRuns);
+       }
+
+       /**
+        * 
+        * @param colIndexes
+        * @return
+        */
+       private int haasAndStokes(int[] colIndexes) {
+               ReaderColumnSelection reader =  new 
ReaderColumnSelectionDenseSample(_data, 
+                               colIndexes, _sampleRows, 
!CompressedMatrixBlock.MATERIALIZE_ZEROS);
+               return haasAndStokes(_numRows, _sampleRows.length, reader);
+       }
+
+       /**
+        * TODO remove, just for local debugging.
+        * 
+        * @param colIndexes
+        * @return
+        */
+       @SuppressWarnings("unused")
+       private int getExactNumDistinctValues(int[] colIndexes) {
+               HashSet<DblArray> distinctVals = new HashSet<DblArray>();
+               ReaderColumnSelection reader = (_data.isInSparseFormat() && 
CompressedMatrixBlock.TRANSPOSE_INPUT) ? 
+                               new ReaderColumnSelectionSparse(_data, 
colIndexes, !CompressedMatrixBlock.MATERIALIZE_ZEROS) : 
+                               new ReaderColumnSelectionDense(_data, 
colIndexes, !CompressedMatrixBlock.MATERIALIZE_ZEROS);
+               DblArray val = null;
+               while (null != (val = reader.nextRow()))
+                       distinctVals.add(val);
+               return distinctVals.size();
+       }
+
+       /**
+        * Returns a sorted array of n integers, drawn uniformly from the range 
[0,range).
+        * 
+        * @param range
+        * @param smplSize
+        * @return
+        */
+       private int[] getSortedUniformSample(int range, int smplSize) {
+               if (smplSize == 0)
+                       return new int[] {};
+               int[] sample = _rng.nextPermutation(range, smplSize);
+               Arrays.sort(sample);
+               return sample;
+       }
+       
+
+       /////////////////////////////////////////////////////
+       // Sample Cardinality Estimator library
+       /////////////////////////////////////////
+       
+       /**
+        * M. Charikar, S. Chaudhuri, R. Motwani, and V. R. Narasayya, Towards
+        * estimation error guarantees for distinct values, PODS'00.
+        * 
+        * @param nRows
+        * @param sampleSize
+        * @param sampleRowsReader
+        *            : a reader for the sampled rows
+        * @return
+        */
+       @SuppressWarnings("unused")
+       private static int guaranteedErrorEstimator(int nRows, int sampleSize,
+                       ReaderColumnSelection sampleRowsReader) {
+               HashMap<DblArray, Integer> valsCount = 
getValCounts(sampleRowsReader);
+               // number of values that occur only once
+               int singltonValsCount = 0;
+               int otherValsCount = 0;
+               for (Integer c : valsCount.values()) {
+                       if (c == 1)
+                               singltonValsCount++;
+                       else
+                               otherValsCount++;
+               }
+               return (int) Math.round(otherValsCount + singltonValsCount
+                               * Math.sqrt(((double) nRows) / sampleSize));
+       }
+
+       /**
+        * Peter J. Haas, Jeffrey F. Naughton, S. Seshadri, and Lynne Stokes. 
+        * Sampling-Based Estimation of the Number of Distinct Values of an
+        * Attribute. VLDB'95, Section 3.2.
+        * 
+        * @param nRows
+        * @param sampleSize
+        * @param sampleRowsReader
+        * @return
+        */
+       @SuppressWarnings("unused")
+       private static int shlosserEstimator(int nRows, int sampleSize,
+                       ReaderColumnSelection sampleRowsReader) 
+       {
+               return shlosserEstimator(nRows, sampleSize, sampleRowsReader,
+                               getValCounts(sampleRowsReader));
+       }
+
+       /**
+        * 
+        * @param nRows
+        * @param sampleSize
+        * @param sampleRowsReader
+        * @param valsCount
+        * @return
+        */
+       private static int shlosserEstimator(int nRows, int sampleSize,
+                       ReaderColumnSelection sampleRowsReader,
+                       HashMap<DblArray, Integer> valsCount) 
+       {
+               double q = ((double) sampleSize) / nRows;
+               double oneMinusQ = 1 - q;
+
+               int[] freqCounts = getFreqCounts(valsCount);
+
+               double numerSum = 0, denomSum = 0;
+               int iPlusOne = 1;
+               for (int i = 0; i < freqCounts.length; i++, iPlusOne++) {
+                       numerSum += Math.pow(oneMinusQ, iPlusOne) * 
freqCounts[i];
+                       denomSum += iPlusOne * q * Math.pow(oneMinusQ, i) * 
freqCounts[i];
+               }
+               int estimate = (int) Math.round(valsCount.size() + freqCounts[0]
+                               * numerSum / denomSum);
+               return estimate < 1 ? 1 : estimate;
+       }
+
+       /**
+        * Peter J. Haas, Jeffrey F. Naughton, S. Seshadri, and Lynne Stokes.
+        * Sampling-Based Estimation of the Number of Distinct Values of an
+        * Attribute. VLDB'95, Section 4.3.
+        * 
+        * @param nRows
+        * @param sampleSize
+        * @param sampleRowsReader
+        * @return
+        */
+       @SuppressWarnings("unused")
+       private static int smoothedJackknifeEstimator(int nRows, int sampleSize,
+                       ReaderColumnSelection sampleRowsReader) 
+       {
+               return smoothedJackknifeEstimator(nRows, sampleSize, 
sampleRowsReader,
+                               getValCounts(sampleRowsReader));
+       }
+
+       /**
+        * 
+        * @param nRows
+        * @param sampleSize
+        * @param sampleRowsReader
+        * @param valsCount
+        * @return
+        */
+       private static int smoothedJackknifeEstimator(int nRows, int sampleSize,
+                       ReaderColumnSelection sampleRowsReader,
+                       HashMap<DblArray, Integer> valsCount) 
+       {
+               int[] freqCounts = getFreqCounts(valsCount);
+               // all values in the sample are zeros
+               if (freqCounts.length == 0)
+                       return 0;
+               // nRows is N and sampleSize is n
+
+               int d = valsCount.size();
+               double f1 = freqCounts[0];
+               int Nn = nRows * sampleSize;
+               double D0 = (d - f1 / sampleSize)
+                               / (1 - (nRows - sampleSize + 1) * f1 / Nn);
+               double NTilde = nRows / D0;
+               /*-
+                *
+                * h (as defined in eq. 5 in the paper) can be implemented as:
+                * 
+                * double h = Gamma(nRows - NTilde + 1) x Gamma.gamma(nRows 
-sampleSize + 1) 
+                *                   
----------------------------------------------------------------
+                *              Gamma.gamma(nRows - sampleSize - NTilde + 1) x 
Gamma.gamma(nRows + 1)
+                * 
+                * 
+                * However, for large values of nRows, Gamma.gamma returns NAN
+                * (factorial of a very large number).
+                * 
+                * The following implementation solves this problem by 
levaraging the
+                * cancelations that show up when expanding the factorials in 
the
+                * numerator and the denominator.
+                * 
+                * 
+                *              min(A,D-1) x [min(A,D-1) -1] x .... x B
+                * h = -------------------------------------------
+                *              C x [C-1] x .... x max(A+1,D)
+                * 
+                * where A = N-\tilde{N}
+                *       B = N-\tilde{N} - n + a
+                *       C = N
+                *       D = N-n+1
+                *       
+                *              
+                *
+                */
+               double A = (int) nRows - NTilde;
+               double B = A - sampleSize + 1;
+               double C = nRows;
+               double D = nRows - sampleSize + 1;
+               A = Math.min(A, D - 1);
+               D = Math.max(A + 1, D);
+               double h = 1;
+
+               for (; A >= B || C >= D; A--, C--) {
+                       if (A >= B)
+                               h *= A;
+                       if (C >= D)
+                               h /= C;
+               }
+               // end of h computation
+
+               double g = 0, gamma = 0;
+               // k here corresponds to k+1 in the paper (the +1 comes from 
replacing n
+               // with n-1)
+               for (int k = 2; k <= sampleSize + 1; k++) {
+                       g += 1.0 / (nRows - NTilde - sampleSize + k);
+               }
+               for (int i = 1; i <= freqCounts.length; i++) {
+                       gamma += i * (i - 1) * freqCounts[i - 1];
+               }
+               gamma *= (nRows - 1) * D0 / Nn / (sampleSize - 1);
+               gamma += D0 / nRows - 1;
+
+               double estimate = (d + nRows * h * g * gamma)
+                               / (1 - (nRows - NTilde - sampleSize + 1) * f1 / 
Nn);
+               return estimate < 1 ? 1 : (int) Math.round(estimate);
+       }
+
+       /**
+        * Peter J. Haas, Jeffrey F. Naughton, S. Seshadri, and Lynne Stokes. 
1995.
+        * Sampling-Based Estimation of the Number of Distinct Values of an
+        * Attribute. VLDB'95, Section 5.2, recommended estimator by the authors
+        * 
+        * @param nRows
+        * @param sampleSize
+        * @param sampleRowsReader
+        * @return
+        */
+       @SuppressWarnings("unused")
+       private static int shlosserJackknifeEstimator(int nRows, int sampleSize,
+                       ReaderColumnSelection sampleRowsReader) {
+               HashMap<DblArray, Integer> valsCount = 
getValCounts(sampleRowsReader);
+
+               // uniformity chi-square test
+               double nBar = ((double) sampleSize) / valsCount.size();
+               // test-statistic
+               double u = 0;
+               for (int cnt : valsCount.values()) {
+                       u += Math.pow(cnt - nBar, 2);
+               }
+               u /= nBar;
+               if (sampleSize != usedSampleSize)
+                       computeCriticalValue(sampleSize);
+               if (u < uniformityCriticalValue) {
+                       // uniform
+                       return smoothedJackknifeEstimator(nRows, sampleSize,
+                                       sampleRowsReader, valsCount);
+               } else {
+                       return shlosserEstimator(nRows, sampleSize, 
sampleRowsReader,
+                                       valsCount);
+               }
+       }
+
+       /*
+        * In the shlosserSmoothedJackknifeEstimator as long as the sample size 
did
+        * not change, we will have the same critical value each time the 
estimator
+        * is used (given that alpha is the same). We cache the critical value 
to
+        * avoid recomputing it in each call.
+        */
+       private static double uniformityCriticalValue;
+       private static int usedSampleSize;
+       
+       private static void computeCriticalValue(int sampleSize) {
+               ChiSquaredDistribution chiSqr = new 
ChiSquaredDistribution(sampleSize - 1);
+               uniformityCriticalValue = 
chiSqr.inverseCumulativeProbability(SHLOSSER_JACKKNIFE_ALPHA);
+               usedSampleSize = sampleSize;
+       }
+
+       /**
+        * Haas, Peter J., and Lynne Stokes.
+        * "Estimating the number of classes in a finite population." Journal 
of the
+        * American Statistical Association 93.444 (1998): 1475-1487.
+        * 
+        * The hybrid estimator given by Eq. 33 in Section 6
+        * 
+        * @param nRows
+        * @param sampleSize
+        * @param sampleRowsReader
+        * @return
+        */
+       private static int haasAndStokes(int nRows, int sampleSize,
+                       ReaderColumnSelection sampleRowsReader) 
+       {
+               HashMap<DblArray, Integer> valsCount = 
getValCounts(sampleRowsReader);
+               // all values in the sample are zeros.
+               if (valsCount.size() == 0)
+                       return 1;
+               int[] freqCounts = getFreqCounts(valsCount);
+               float q = ((float) sampleSize) / nRows;
+               float _1MinusQ = 1 - q;
+               // Eq. 11
+               float duj1Fraction = ((float) sampleSize)
+                               / (sampleSize - _1MinusQ * freqCounts[0]);
+               float duj1 = duj1Fraction * valsCount.size();
+               // Eq. 16
+               float gamma = 0;
+               for (int i = 1; i <= freqCounts.length; i++) {
+                       gamma += i * (i - 1) * freqCounts[i - 1];
+               }
+               gamma *= duj1 / sampleSize / sampleSize;
+               gamma += duj1 / nRows - 1;
+               gamma = Math.max(gamma, 0);
+               int estimate;
+               
+               if (gamma < HAAS_AND_STOKES_ALPHA1) {
+                       // UJ2 - begining of page 1479
+               //      System.out.println("uj2");
+                       estimate = (int) (duj1Fraction * (valsCount.size() - 
freqCounts[0]
+                                       * _1MinusQ * Math.log(_1MinusQ) * gamma 
/ q));
+               } else if (gamma < HAAS_AND_STOKES_ALPHA2) {
+                       // UJ2a - end of page 1998
+                       //System.out.println("uj2a");
+                       int numRemovedClasses = 0;
+                       float updatedNumRows = nRows;
+                       int updatedSampleSize = sampleSize;
+
+                       for (Integer cnt : valsCount.values()) {
+                               if (cnt > HAAS_AND_STOKES_UJ2A_C) {
+                                       numRemovedClasses++;
+                                       freqCounts[cnt - 1]--;
+                                       updatedSampleSize -= cnt;
+                                       /*
+                                        * To avoid solving Eq. 20 numerically 
for the class size in
+                                        * the full population (N_j), the 
current implementation
+                                        * just scales cnt (n_j) by the 
sampling ratio (q).
+                                        * Intuitively, the scaling should be 
fine since cnt is
+                                        * large enough. Also, N_j in Eq. 20 is 
lower-bounded by cnt
+                                        * which is already large enough to 
make the denominator in
+                                        * Eq. 20 very close to 1.
+                                        */
+                                       updatedNumRows -= ((float) cnt) / q;
+                               }
+                       }
+                       if (updatedSampleSize == 0) {
+                               // use uJ2a
+                               
+                               estimate = (int) (duj1Fraction * 
(valsCount.size() - freqCounts[0]
+                                               * (_1MinusQ) * 
Math.log(_1MinusQ) * gamma / q));
+                       } else {
+                               float updatedQ = ((float) updatedSampleSize) / 
updatedNumRows;
+                               int updatedSampleCardinality = valsCount.size()
+                                               - numRemovedClasses;
+                               float updatedDuj1Fraction = ((float) 
updatedSampleSize)
+                                               / (updatedSampleSize - (1 - 
updatedQ) * freqCounts[0]);
+                               float updatedDuj1 = updatedDuj1Fraction
+                                               * updatedSampleCardinality;
+                               float updatedGamma = 0;
+                               for (int i = 1; i <= freqCounts.length; i++) {
+                                       updatedGamma += i * (i - 1) * 
freqCounts[i - 1];
+                               }
+                               updatedGamma *= updatedDuj1 / updatedSampleSize
+                                               / updatedSampleSize;
+                               updatedGamma += updatedDuj1 / updatedNumRows - 
1;
+                               updatedGamma = Math.max(updatedGamma, 0);
+
+                               estimate = (int) (updatedDuj1Fraction * 
(updatedSampleCardinality - freqCounts[0]
+                                               * (1 - updatedQ)
+                                               * Math.log(1 - updatedQ)
+                                               * updatedGamma / updatedQ))
+                                               + numRemovedClasses;
+                       }
+
+               } else {
+                       // Sh3 - end of section 3
+                       float fraq1Numer = 0;
+                       float fraq1Denom = 0;
+                       float fraq2Numer = 0;
+                       float fraq2Denom = 0;
+                       for (int i = 1; i <= freqCounts.length; i++) {
+                               fraq1Numer += i * q * q * Math.pow(1 - q * q, i 
- 1)
+                                               * freqCounts[i - 1];
+                               fraq1Denom += Math.pow(_1MinusQ, i) * 
(Math.pow(1 + q, i) - 1)
+                                               * freqCounts[i - 1];
+                               fraq2Numer += Math.pow(_1MinusQ, i) * 
freqCounts[i - 1];
+                               fraq2Denom += i * q * Math.pow(_1MinusQ, i - 1)
+                                               * freqCounts[i - 1];
+                       }
+                       estimate = (int) (valsCount.size() + freqCounts[0] * 
fraq1Numer
+                                       / fraq1Denom * fraq2Numer * fraq2Numer 
/ fraq2Denom
+                                       / fraq2Denom);
+               }
+               return estimate < 1 ? 1 : estimate;
+       }
+
+       /**
+        * 
+        * @param sampleRowsReader
+        * @return
+        */
+       private static HashMap<DblArray, Integer> getValCounts(
+                       ReaderColumnSelection sampleRowsReader) 
+       {
+               HashMap<DblArray, Integer> valsCount = new HashMap<DblArray, 
Integer>();
+               DblArray val = null;
+               Integer cnt;
+               while (null != (val = sampleRowsReader.nextRow())) {
+                       cnt = valsCount.get(val);
+                       if (cnt == null)
+                               cnt = 0;
+                       cnt++;
+                       valsCount.put(val, cnt);
+               }
+               return valsCount;
+       }
+
+       /**
+        * 
+        * @param valsCount
+        * @return
+        */
+       private static int[] getFreqCounts(HashMap<DblArray, Integer> 
valsCount) 
+       {
+               int maxCount = 0;
+               for (Integer c : valsCount.values()) {
+                       if (c > maxCount)
+                               maxCount = c;
+               }
+               
+               /*
+                * freqCounts[i-1] = how many values occured with a frequecy i
+                */
+               int[] freqCounts = new int[maxCount];
+               for (Integer c : valsCount.values()) {
+                       freqCounts[c - 1]++;
+               }
+               return freqCounts;
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
new file mode 100644
index 0000000..834483e
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.estim;
+
+/**
+ * 
+ * A helper reusable object for maintaining bitmap sizes
+ */
+public class CompressedSizeInfo 
+{
+       private int _estCard = -1;
+       private long _rleSize = -1; 
+       private long _oleSize = -1;
+
+       public CompressedSizeInfo() {
+               
+       }
+
+       public CompressedSizeInfo(int estCard, long rleSize, long oleSize) {
+               _estCard = estCard;
+               _rleSize = rleSize;
+               _oleSize = oleSize;
+       }
+
+       public void setRLESize(long rleSize) {
+               _rleSize = rleSize;
+       }
+       
+       public long getRLESize() {
+               return _rleSize;
+       }
+       
+       public void setOLESize(long oleSize) {
+               _oleSize = oleSize;
+       }
+
+       public long getOLESize() {
+               return _oleSize;
+       }
+
+       public long getMinSize() {
+               return Math.min(_rleSize, _oleSize);
+       }
+
+       public void setEstCardinality(int estCard) {
+               _estCard = estCard;
+       }
+
+       public int getEstCarinality() {
+               return _estCard;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
 
b/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
new file mode 100644
index 0000000..f857b5b
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.estim;
+
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+public class SizeEstimatorFactory 
+{
+       public static final float SAMPLING_RATIO = 0.01f; //conservative default
+       
+       /**
+        * 
+        * @param data
+        * @param numRows
+        * @return
+        */
+       @SuppressWarnings("unused")
+       public static CompressedSizeEstimator getSizeEstimator(MatrixBlock 
data, int numRows) {
+               return (SAMPLING_RATIO == 1.0) ?
+                               new CompressedSizeEstimatorExact(data):
+                               new CompressedSizeEstimatorSample(data, (int) 
(numRows*SAMPLING_RATIO));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java 
b/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
new file mode 100644
index 0000000..e87ac29
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.compress.ColGroup;
+import org.apache.sysml.runtime.compress.ColGroupOLE;
+import org.apache.sysml.runtime.compress.ColGroupRLE;
+import org.apache.sysml.runtime.compress.ColGroupUncompressed;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+
+public class ConverterUtils 
+{
+       /**
+        * Copy col group instance with deep copy of column indices but
+        * shallow copy of actual contents;
+        * 
+        * @param group
+        * @return
+        */
+       public static ColGroup copyColGroup(ColGroup group)
+       {
+               ColGroup ret = null;
+               
+               //deep copy col indices
+               int[] colIndices = Arrays.copyOf(group.getColIndices(), 
group.getNumCols());
+               
+               //create copy of column group
+               if( group instanceof ColGroupUncompressed ) {
+                       ColGroupUncompressed in = (ColGroupUncompressed)group;
+                       ret = new ColGroupUncompressed(colIndices, 
in.getNumRows(), in.getData());
+               }
+               else if( group instanceof ColGroupRLE ) {
+                       ColGroupRLE in = (ColGroupRLE)group;
+                       ret = new ColGroupRLE(colIndices, in.getNumRows(), 
in.getValues(), 
+                                       in.getBitmaps(), in.getBitmapOffsets());
+               }
+               else if( group instanceof ColGroupOLE ) {
+                       ColGroupOLE in = (ColGroupOLE) group;
+                       ret = new ColGroupOLE(colIndices, in.getNumRows(), 
in.getValues(), 
+                                       in.getBitmaps(), in.getBitmapOffsets());
+               }
+               
+               return ret;
+       }
+       
+       /**
+        * 
+        * @param vector
+        * @return
+        */
+       public static double[] getDenseVector( MatrixBlock vector )
+       {
+               if( vector.isInSparseFormat() )
+                       return DataConverter.convertToDoubleVector(vector);
+               else 
+                       return vector.getDenseBlock();
+       }
+       
+       /**
+        * 
+        * @param group
+        * @return
+        */
+       public static MatrixBlock getUncompressedColBlock( ColGroup group )
+       {
+               MatrixBlock ret = null;
+               if( group instanceof ColGroupUncompressed ) {
+                       ret = ((ColGroupUncompressed) group).getData();
+               }
+               else {
+                       ArrayList<ColGroup> tmpGroup = new 
ArrayList<ColGroup>(Arrays.asList(group));
+                       ColGroupUncompressed decompressedCols = new 
ColGroupUncompressed(tmpGroup);
+                       ret = decompressedCols.getData();
+               }
+               
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/utils/DblArray.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/utils/DblArray.java 
b/src/main/java/org/apache/sysml/runtime/compress/utils/DblArray.java
new file mode 100644
index 0000000..49c163b
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/DblArray.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.utils;
+
+import java.util.Arrays;
+
+/**
+ * Helper class used for bitmap extraction.
+ *
+ */
+public class DblArray 
+{
+       private double[] _arr = null;
+       private boolean _zero = false;
+       
+       public DblArray() {
+               this(null, false);
+       }
+       
+       public DblArray(double[] arr) {
+               this(arr, false);
+       }
+       
+       public DblArray(DblArray that) {
+               this(Arrays.copyOf(that._arr, that._arr.length), that._zero);
+       }
+
+       public DblArray(double[] arr, boolean allZeros) {
+               _arr = arr;
+               _zero = allZeros;
+       }
+       
+       public double[] getData() {
+               return _arr;
+       }
+       
+       @Override
+       public int hashCode() {
+               return _zero ? 0 : Arrays.hashCode(_arr);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               return ( o instanceof DblArray
+                       && _zero == ((DblArray) o)._zero
+                       && Arrays.equals(_arr, ((DblArray) o)._arr) );
+       }
+
+       @Override
+       public String toString() {
+               return Arrays.toString(_arr);
+       }
+
+       /**
+        * 
+        * @param ds
+        * @return
+        */
+       public static boolean isZero(double[] ds) {
+               for (int i = 0; i < ds.length; i++)
+                       if (ds[i] != 0.0)
+                               return false;
+               return true;
+       }
+
+       /**
+        * 
+        * @param val
+        * @return
+        */
+       public static boolean isZero(DblArray val) {
+               return val._zero || isZero(val._arr);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/utils/DblArrayIntListHashMap.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/utils/DblArrayIntListHashMap.java
 
b/src/main/java/org/apache/sysml/runtime/compress/utils/DblArrayIntListHashMap.java
new file mode 100644
index 0000000..a5455ab
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/utils/DblArrayIntListHashMap.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.utils;
+
+import java.util.ArrayList;
+
+/**
+ * This class provides a memory-efficient replacement for
+ * HashMap<DblArray,IntArrayList> for restricted use cases.
+ * 
+ */
+public class DblArrayIntListHashMap 
+{
+       private static final int INIT_CAPACITY = 8;
+       private static final int RESIZE_FACTOR = 2;
+       private static final float LOAD_FACTOR = 0.75f;
+
+       private DArrayIListEntry[] _data = null;
+       private int _size = -1;
+
+       public DblArrayIntListHashMap() {
+               _data = new DArrayIListEntry[INIT_CAPACITY];
+               _size = 0;
+       }
+
+       /**
+        * 
+        * @return
+        */
+       public int size() {
+               return _size;
+       }
+
+       /**
+        * 
+        * @param key
+        * @return
+        */
+       public IntArrayList get(DblArray key) {
+               // probe for early abort
+               if( _size == 0 )
+                       return null;
+
+               // compute entry index position
+               int hash = hash(key);
+               int ix = indexFor(hash, _data.length);
+
+               // find entry
+               for( DArrayIListEntry e = _data[ix]; e != null; e = e.next ) {
+                       if( e.key.equals(key) ) {
+                               return e.value;
+                       }
+               }
+
+               return null;
+       }
+
+       /**
+        * 
+        * @param key
+        * @param value
+        */
+       public void appendValue(DblArray key, IntArrayList value) {
+               // compute entry index position
+               int hash = hash(key);
+               int ix = indexFor(hash, _data.length);
+
+               // add new table entry (constant time)
+               DArrayIListEntry enew = new DArrayIListEntry(key, value);
+               enew.next = _data[ix]; // colliding entries / null
+               _data[ix] = enew;
+               _size++;
+
+               // resize if necessary
+               if( _size >= LOAD_FACTOR * _data.length )
+                       resize();
+       }
+
+       /**
+        * 
+        * @return
+        */
+       public ArrayList<DArrayIListEntry> extractValues() {
+               ArrayList<DArrayIListEntry> ret = new 
ArrayList<DArrayIListEntry>();
+               for( DArrayIListEntry e : _data ) {
+                       if( e != null ) {
+                               while( e.next != null ) {
+                                       ret.add(e);
+                                       e = e.next;
+                               }
+                               ret.add(e);
+                       }
+               }
+
+               return ret;
+       }
+
+       /**
+     * 
+     */
+       private void resize() {
+               // check for integer overflow on resize
+               if( _data.length > Integer.MAX_VALUE / RESIZE_FACTOR )
+                       return;
+
+               // resize data array and copy existing contents
+               DArrayIListEntry[] olddata = _data;
+               _data = new DArrayIListEntry[_data.length * RESIZE_FACTOR];
+               _size = 0;
+
+               // rehash all entries
+               for( DArrayIListEntry e : olddata ) {
+                       if( e != null ) {
+                               while( e.next != null ) {
+                                       appendValue(e.key, e.value);
+                                       e = e.next;
+                               }
+                               appendValue(e.key, e.value);
+                       }
+               }
+       }
+
+       /**
+        * 
+        * @param key
+        * @return
+        */
+       private static int hash(DblArray key) {
+               int h = key.hashCode();
+
+               // This function ensures that hashCodes that differ only by
+               // constant multiples at each bit position have a bounded
+               // number of collisions (approximately 8 at default load 
factor).
+               h ^= (h >>> 20) ^ (h >>> 12);
+               return h ^ (h >>> 7) ^ (h >>> 4);
+       }
+
+       /**
+        * 
+        * @param h
+        * @param length
+        * @return
+        */
+       private static int indexFor(int h, int length) {
+               return h & (length - 1);
+       }
+
+       /**
+        *
+        */
+       public class DArrayIListEntry {
+               public DblArray key;
+               public IntArrayList value;
+               public DArrayIListEntry next;
+
+               public DArrayIListEntry(DblArray ekey, IntArrayList evalue) {
+                       key = ekey;
+                       value = evalue;
+                       next = null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/utils/DoubleIntListHashMap.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/utils/DoubleIntListHashMap.java
 
b/src/main/java/org/apache/sysml/runtime/compress/utils/DoubleIntListHashMap.java
new file mode 100644
index 0000000..5607a3f
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/compress/utils/DoubleIntListHashMap.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.utils;
+
+import java.util.ArrayList;
+
+/**
+ * This class provides a memory-efficient replacement for
+ * HashMap<Double,IntArrayList> for restricted use cases.
+ * 
+ */
+public class DoubleIntListHashMap 
+{
+       private static final int INIT_CAPACITY = 8;
+       private static final int RESIZE_FACTOR = 2;
+       private static final float LOAD_FACTOR = 0.75f;
+
+       private DIListEntry[] _data = null;
+       private int _size = -1;
+
+       public DoubleIntListHashMap() {
+               _data = new DIListEntry[INIT_CAPACITY];
+               _size = 0;
+       }
+
+       /**
+        * 
+        * @return
+        */
+       public int size() {
+               return _size;
+       }
+
+       /**
+        * 
+        * @param key
+        * @return
+        */
+       public IntArrayList get(double key) {
+               // probe for early abort
+               if( _size == 0 )
+                       return null;
+
+               // compute entry index position
+               int hash = hash(key);
+               int ix = indexFor(hash, _data.length);
+
+               // find entry
+               for( DIListEntry e = _data[ix]; e != null; e = e.next ) {
+                       if( e.key == key ) {
+                               return e.value;
+                       }
+               }
+
+               return null;
+       }
+
+       /**
+        * 
+        * @param key
+        * @param value
+        */
+       public void appendValue(double key, IntArrayList value) {
+               // compute entry index position
+               int hash = hash(key);
+               int ix = indexFor(hash, _data.length);
+
+               // add new table entry (constant time)
+               DIListEntry enew = new DIListEntry(key, value);
+               enew.next = _data[ix]; // colliding entries / null
+               _data[ix] = enew;
+               _size++;
+
+               // resize if necessary
+               if( _size >= LOAD_FACTOR * _data.length )
+                       resize();
+       }
+
+       /**
+        * 
+        * @return
+        */
+       public ArrayList<DIListEntry> extractValues() {
+               ArrayList<DIListEntry> ret = new ArrayList<DIListEntry>();
+               for( DIListEntry e : _data ) {
+                       if (e != null) {
+                               while( e.next != null ) {
+                                       ret.add(e);
+                                       e = e.next;
+                               }
+                               ret.add(e);
+                       }
+               }
+
+               return ret;
+       }
+
+       /**
+     * 
+     */
+       private void resize() {
+               // check for integer overflow on resize
+               if( _data.length > Integer.MAX_VALUE / RESIZE_FACTOR )
+                       return;
+
+               // resize data array and copy existing contents
+               DIListEntry[] olddata = _data;
+               _data = new DIListEntry[_data.length * RESIZE_FACTOR];
+               _size = 0;
+
+               // rehash all entries
+               for( DIListEntry e : olddata ) {
+                       if( e != null ) {
+                               while( e.next != null ) {
+                                       appendValue(e.key, e.value);
+                                       e = e.next;
+                               }
+                               appendValue(e.key, e.value);
+                       }
+               }
+       }
+
+       /**
+        * 
+        * @param key
+        * @return
+        */
+       private static int hash(double key) {
+               // basic double hash code (w/o object creation)
+               long bits = Double.doubleToRawLongBits(key);
+               int h = (int) (bits ^ (bits >>> 32));
+
+               // This function ensures that hashCodes that differ only by
+               // constant multiples at each bit position have a bounded
+               // number of collisions (approximately 8 at default load 
factor).
+               h ^= (h >>> 20) ^ (h >>> 12);
+               return h ^ (h >>> 7) ^ (h >>> 4);
+       }
+
+       /**
+        * 
+        * @param h
+        * @param length
+        * @return
+        */
+       private static int indexFor(int h, int length) {
+               return h & (length - 1);
+       }
+
+       /**
+        *
+        */
+       public class DIListEntry {
+               public double key = Double.MAX_VALUE;
+               public IntArrayList value = null;
+               public DIListEntry next = null;
+
+               public DIListEntry(double ekey, IntArrayList evalue) {
+                       key = ekey;
+                       value = evalue;
+                       next = null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java 
b/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
new file mode 100644
index 0000000..33455a2
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.utils;
+
+import java.util.Arrays;
+
+/**
+ * This class provides a memory-efficient replacement for ArrayList<Integer> 
for
+ * restricted use cases.
+ * 
+ */
+public class IntArrayList 
+{
+       private static final int INIT_CAPACITY = 4;
+       private static final int RESIZE_FACTOR = 2;
+
+       private int[] _data = null;
+       private int _size = -1;
+       private int _val0 = -1;
+
+       public IntArrayList() {
+               _data = null;
+               _size = 0;
+       }
+
+       /**
+        * 
+        * @return
+        */
+       public int size() {
+               return _size;
+       }
+
+       /**
+        * 
+        * @param value
+        */
+       public void appendValue(int value) {
+               // embedded value (no array allocation)
+               if( _size == 0 ) {
+                       _val0 = value;
+                       _size = 1;
+                       return;
+               }
+
+               // allocate or resize array if necessary
+               if( _data == null ) {
+                       _data = new int[INIT_CAPACITY];
+                       _data[0] = _val0;
+               } 
+               else if( _size + 1 >= _data.length ) {
+                       resize();
+               }
+
+               // append value
+               _data[_size] = value;
+               _size++;
+       }
+
+       /**
+        * 
+        * @return
+        */
+       public int[] extractValues() {
+               if( _size == 1 )
+                       return new int[] { _val0 };
+               else
+                       return Arrays.copyOfRange(_data, 0, _size);
+       }
+
+       /**
+        * 
+        */
+       private void resize() {
+               // check for integer overflow on resize
+               if( _data.length > Integer.MAX_VALUE / RESIZE_FACTOR )
+                       throw new RuntimeException(
+                                       "IntArrayList resize leads to integer 
overflow: size=" + _size);
+
+               // resize data array and copy existing contents
+               int[] newdata = new int[_data.length * RESIZE_FACTOR];
+               System.arraycopy(_data, 0, newdata, 0, _size);
+               _data = newdata;
+       }
+}

Reply via email to