This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new cb369dfbdb [SYSTEMDS-3592] Compressed Frame Bug fixes
cb369dfbdb is described below

commit cb369dfbdbbbd099dc3107f5ca529f708e819459
Author: baunsgaard <[email protected]>
AuthorDate: Mon Jul 10 18:28:58 2023 +0200

    [SYSTEMDS-3592] Compressed Frame Bug fixes
    
    This commit fixes a number of minor bugs in the size estimation, and
    execution of frame compression.
    Also included is fixes in the combine unique testing, for normal
    Compressed matrices that did not construct lean dictionaries.
    
    Closes #1859
---
 .../runtime/compress/CompressedMatrixBlock.java    |  10 +
 .../runtime/compress/colgroup/AColGroupValue.java  |   8 +-
 .../runtime/compress/colgroup/ColGroupConst.java   |  11 +-
 .../runtime/compress/colgroup/ColGroupDDC.java     |  21 +-
 .../colgroup/dictionary/DictionaryFactory.java     | 178 ++++++++++++++--
 .../colgroup/dictionary/IdentityDictionary.java    |   6 +-
 .../dictionary/IdentityDictionarySlice.java        |  38 ++--
 .../compress/colgroup/mapping/AMapToData.java      |  13 ++
 .../compress/colgroup/mapping/MapToBit.java        |  17 +-
 .../compress/colgroup/mapping/MapToByte.java       |   7 +
 .../compress/colgroup/mapping/MapToChar.java       |   7 +
 .../compress/colgroup/mapping/MapToCharPByte.java  |  11 +-
 .../compress/colgroup/mapping/MapToInt.java        |   7 +
 .../compress/colgroup/mapping/MapToZero.java       |   7 +
 .../runtime/compress/estim/encoding/AEncode.java   |  28 +++
 .../compress/estim/encoding/ConstEncoding.java     |   7 +-
 .../compress/estim/encoding/DenseEncoding.java     |  16 +-
 .../compress/estim/encoding/EmptyEncoding.java     |   7 +-
 .../runtime/compress/estim/encoding/IEncode.java   |  17 +-
 .../compress/estim/encoding/SparseEncoding.java    |  41 +++-
 .../runtime/compress/lib/CLALibCombineGroups.java  |  51 +++--
 .../runtime/compress/lib/CLALibDecompress.java     |   9 +-
 .../sysds/runtime/compress/lib/CLALibSlice.java    |   2 -
 .../sysds/runtime/compress/lib/CLALibUtils.java    |  15 ++
 .../sysds/runtime/frame/data/FrameBlock.java       |  13 +-
 .../frame/data/columns/ACompressedArray.java       |  16 +-
 .../sysds/runtime/frame/data/columns/Array.java    |  15 +-
 .../runtime/frame/data/columns/ArrayFactory.java   | 131 ++++++++----
 .../runtime/frame/data/columns/BitSetArray.java    |   9 +
 .../runtime/frame/data/columns/BooleanArray.java   |   8 +
 .../runtime/frame/data/columns/CharArray.java      |   8 +
 .../sysds/runtime/frame/data/columns/DDCArray.java |  31 +++
 .../runtime/frame/data/columns/DoubleArray.java    |  11 +-
 .../runtime/frame/data/columns/FloatArray.java     |  11 +-
 .../runtime/frame/data/columns/IntegerArray.java   |  11 +-
 .../runtime/frame/data/columns/LongArray.java      |  11 +-
 .../runtime/frame/data/columns/OptionalArray.java  |  29 ++-
 .../runtime/frame/data/columns/RaggedArray.java    |   5 +
 .../runtime/frame/data/columns/StringArray.java    |   8 +
 .../data/compress/ArrayCompressionStatistics.java  |  11 +-
 .../data/compress/CompressedFrameBlockFactory.java |   5 +-
 .../sysds/runtime/io/FrameReaderBinaryBlock.java   | 130 ++++++------
 .../sysds/runtime/io/MatrixWriterFactory.java      |  11 +-
 .../apache/sysds/runtime/io/WriterBinaryBlock.java |  16 +-
 .../apache/sysds/runtime/util/UtilFunctions.java   |   1 +
 .../compress/combine/CombineEncodings.java         |  88 ++++++++
 .../compress/combine/CombineEncodingsUnique.java   | 193 +++++++++++++++++
 .../component/compress/dictionary/CombineTest.java | 229 ++++++++++++++++++++-
 .../component/compress/mapping/MappingTests.java   |   2 +-
 .../component/frame/FrameSerializationTest.java    |  12 +-
 .../component/frame/array/FrameArrayTests.java     |  17 +-
 .../component/frame/array/NegativeArrayTests.java  |   9 +-
 .../frame/compress/FrameCompressTest.java          |  16 +-
 .../frame/compress/FrameCompressTestUtils.java     |  73 +++++++
 54 files changed, 1398 insertions(+), 266 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 7ec6474916..4a1d492856 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -95,6 +95,11 @@ public class CompressedMatrixBlock extends MatrixBlock {
        private static final Log LOG = 
LogFactory.getLog(CompressedMatrixBlock.class.getName());
        private static final long serialVersionUID = 73193720143154058L;
 
+       /**
+        * Debugging flag for Compressed Matrices
+        */
+       public static boolean debug = true;
+
        /**
         * Column groups
         */
@@ -928,6 +933,11 @@ public class CompressedMatrixBlock extends MatrixBlock {
                        .getUncompressed(message) : (MatrixBlock) mVal;
        }
 
+       public static MatrixBlock getUncompressed(MatrixValue mVal, String 
message, int k) {
+               return isCompressed((MatrixBlock) mVal) ? 
((CompressedMatrixBlock) mVal).getUncompressed(message,
+                       k) : (MatrixBlock) mVal;
+       }
+
        public MatrixBlock getUncompressed() {
                return getUncompressed((String) null);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
index 689a1b4337..2b8976cfa4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
@@ -153,9 +153,13 @@ public abstract class AColGroupValue extends 
ADictBasedColGroup {
 
        @Override
        protected AColGroup sliceMultiColumns(int idStart, int idEnd, IColIndex 
outputCols) {
-               ADictionary retDict = _dict.sliceOutColumnRange(idStart, idEnd, 
_colIndexes.size());
+               final ADictionary retDict = _dict.sliceOutColumnRange(idStart, 
idEnd, _colIndexes.size());
                if(retDict == null)
                        return new ColGroupEmpty(outputCols);
+
+               if(retDict.getNumberOfValues(outputCols.size()) != 
getNumValues())
+                       throw new DMLCompressionException("Invalid Slice Multi 
Columns");
+
                return copyAndSet(outputCols, retDict);
        }
 
@@ -204,7 +208,7 @@ public abstract class AColGroupValue extends 
ADictBasedColGroup {
        }
 
        @Override
-       public void clear(){
+       public void clear() {
                counts = null;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
index e3454886d6..7e99d0e24b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
@@ -73,6 +73,14 @@ public class ColGroupConst extends ADictBasedColGroup 
implements IContainDefault
        public static AColGroup create(IColIndex colIndices, ADictionary dict) {
                if(dict == null)
                        return new ColGroupEmpty(colIndices);
+               else if(dict.getNumberOfValues(colIndices.size()) > 1) {
+                       // extract dict first row
+                       final double[] nd = new double[colIndices.size()];
+                       for(int i = 0; i < colIndices.size(); i++)
+                               nd[i] = dict.getValue(i);
+
+                       return ColGroupConst.create(colIndices, nd);
+               }
                else
                        return new ColGroupConst(colIndices, dict);
        }
@@ -578,7 +586,8 @@ public class ColGroupConst extends ADictBasedColGroup 
implements IContainDefault
        @Override
        public CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
                EstimationFactors ef = new EstimationFactors(1, 1, 1, 
_dict.getSparsity());
-               return new CompressedSizeInfoColGroup(_colIndexes, ef, 
estimateInMemorySize(), CompressionType.CONST, getEncoding());
+               return new CompressedSizeInfoColGroup(_colIndexes, ef, 
estimateInMemorySize(), CompressionType.CONST,
+                       getEncoding());
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index 49e234a293..3882fc2999 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -25,6 +25,9 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
@@ -62,6 +65,16 @@ public class ColGroupDDC extends APreAgg implements 
IMapToDataGroup {
        private ColGroupDDC(IColIndex colIndexes, ADictionary dict, AMapToData 
data, int[] cachedCounts) {
                super(colIndexes, dict, cachedCounts);
                _data = data;
+
+               if(CompressedMatrixBlock.debug) {
+                       if(data.getUnique() != 
dict.getNumberOfValues(colIndexes.size()))
+                               throw new DMLCompressionException("Invalid map 
to dict Map has:" + data.getUnique() + " while dict has "
+                                       + 
dict.getNumberOfValues(colIndexes.size()) );
+                       int[] c = getCounts();
+                       if(c.length != 
dict.getNumberOfValues(colIndexes.size()))
+                               throw new DMLCompressionException("Invalid DDC 
Construction");
+               }
+
        }
 
        public static AColGroup create(IColIndex colIndexes, ADictionary dict, 
AMapToData data, int[] cachedCounts) {
@@ -490,8 +503,12 @@ public class ColGroupDDC extends APreAgg implements 
IMapToDataGroup {
 
        @Override
        public AColGroup sliceRows(int rl, int ru) {
-               AMapToData sliceMap = _data.slice(rl, ru);
-               return new ColGroupDDC(_colIndexes, _dict, sliceMap, null);
+               try {
+                       return ColGroupDDC.create(_colIndexes, _dict, 
_data.slice(rl, ru), null);
+               }
+               catch(Exception e) {
+                       throw new DMLRuntimeException("Failed to slice out sub 
part DDC: " + rl + " " + ru, e);
+               }
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
index 806ac9ab53..3228e0b19d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
@@ -259,17 +259,17 @@ public interface DictionaryFactory {
                        ADictionary bd = ((IContainADictionary) 
b).getDictionary();
                        if(ac.isConst()) {
                                if(bc.isConst()) {
-                                       return new 
Dictionary(CLALibCombineGroups.constructDefaultTuple(a, b));
+                                       return 
Dictionary.create(CLALibCombineGroups.constructDefaultTuple(a, b));
                                }
                                else if(bc.isDense()) {
                                        final double[] at = 
((IContainDefaultTuple) a).getDefaultTuple();
-                                       return combineConstSparseSparseRet(at, 
bd, b.getNumCols());
+                                       return combineConstSparseSparseRet(at, 
bd, b.getNumCols(), filter);
                                }
                        }
                        else if(ac.isDense()) {
                                if(bc.isConst()) {
                                        final double[] bt = 
((IContainDefaultTuple) b).getDefaultTuple();
-                                       return combineSparseConstSparseRet(ad, 
a.getNumCols(), bt);
+                                       return combineSparseConstSparseRet(ad, 
a.getNumCols(), bt, filter);
                                }
                                else if(bc.isDense())
                                        return combineFullDictionaries(ad, 
a.getNumCols(), bd, b.getNumCols(), filter);
@@ -286,7 +286,7 @@ public interface DictionaryFactory {
                                }
                        }
                }
-               throw new NotImplementedException("Not supporting combining 
dense: " + a + " " + b);
+               throw new NotImplementedException("Not supporting combining: " 
+ a + " " + b);
        }
 
        /**
@@ -395,11 +395,7 @@ public interface DictionaryFactory {
        }
 
        public static ADictionary combineSDCRight(ADictionary a, int nca, 
ADictionary b, double[] tub) {
-               return combineSDCRight(a, nca, b, tub, null);
-       }
 
-       public static ADictionary combineSDCRight(ADictionary a, int nca, 
ADictionary b, double[] tub,
-               Map<Integer, Integer> filter) {
                final int ncb = tub.length;
                final int ra = a.getNumberOfValues(nca);
                final int rb = b.getNumberOfValues(ncb);
@@ -416,7 +412,7 @@ public interface DictionaryFactory {
                        for(int c = 0; c < nca; c++)
                                out.quickSetValue(r, c, ma.quickGetValue(r, c));
                        for(int c = 0; c < ncb; c++)
-                               out.quickSetValue(0, c + nca, tub[c]);
+                               out.quickSetValue(r, c + nca, tub[c]);
                }
 
                for(int r = ra; r < out.getNumRows(); r++) {
@@ -432,14 +428,51 @@ public interface DictionaryFactory {
                return new MatrixBlockDictionary(out);
        }
 
-       public static ADictionary combineSDC(ADictionary a, double[] tua, 
ADictionary b, double[] tub) {
-               return combineSDC(a, tua, b, tub, null);
+       public static ADictionary combineSDCRight(ADictionary a, int nca, 
ADictionary b, double[] tub,
+               Map<Integer, Integer> filter) {
+               if(filter == null)
+                       return combineSDCRight(a, nca, b, tub);
+               final int ncb = tub.length;
+               final int ra = a.getNumberOfValues(nca);
+               final int rb = b.getNumberOfValues(ncb);
+
+               MatrixBlock ma = a.getMBDict(nca).getMatrixBlock();
+               MatrixBlock mb = b.getMBDict(ncb).getMatrixBlock();
+
+               MatrixBlock out = new MatrixBlock(filter.size(), nca + ncb, 
false);
+
+               out.allocateBlock();
+
+               for(int r = 0; r < ra; r++) {
+                       if(filter.containsKey(r)) {
+
+                               int o = filter.get(r);
+                               for(int c = 0; c < nca; c++)
+                                       out.quickSetValue(o, c, 
ma.quickGetValue(r, c));
+                               for(int c = 0; c < ncb; c++)
+                                       out.quickSetValue(o, c + nca, tub[c]);
+                       }
+
+               }
+
+               for(int r = ra; r <  ra * rb; r++) {
+                       if(filter.containsKey(r)) {
+                               int o = filter.get(r);
+
+                               int ia = r % ra;
+                               int ib = r / ra - 1;
+                               for(int c = 0; c < nca; c++) // all good.
+                                       out.quickSetValue(o, c, 
ma.quickGetValue(ia, c));
+
+                               for(int c = 0; c < ncb; c++)
+                                       out.quickSetValue(o, c + nca, 
mb.quickGetValue(ib, c));
+
+                       }
+               }
+               return new MatrixBlockDictionary(out);
        }
 
-       public static ADictionary combineSDC(ADictionary a, double[] tua, 
ADictionary b, double[] tub,
-               Map<Integer, Integer> filter) {
-               if(filter != null)
-                       throw new NotImplementedException();
+       public static ADictionary combineSDC(ADictionary a, double[] tua, 
ADictionary b, double[] tub) {
                final int nca = tua.length;
                final int ncb = tub.length;
                final int ra = a.getNumberOfValues(nca);
@@ -487,6 +520,67 @@ public interface DictionaryFactory {
                return new MatrixBlockDictionary(out);
        }
 
+       public static ADictionary combineSDC(ADictionary a, double[] tua, 
ADictionary b, double[] tub,
+               Map<Integer, Integer> filter) {
+               if(filter == null)
+                       return combineSDC(a, tua, b, tub);
+               final int nca = tua.length;
+               final int ncb = tub.length;
+               final int ra = a.getNumberOfValues(nca);
+               final int rb = b.getNumberOfValues(nca);
+
+               MatrixBlock ma = a.getMBDict(nca).getMatrixBlock();
+               MatrixBlock mb = b.getMBDict(ncb).getMatrixBlock();
+
+               MatrixBlock out = new MatrixBlock(filter.size(), nca + ncb, 
false);
+
+               out.allocateBlock();
+
+               // 0 row both default tuples
+               if(filter.containsKey(0)) {
+                       int o = filter.get(0);
+                       for(int c = 0; c < nca; c++)
+                               out.quickSetValue(o, c, tua[c]);
+
+                       for(int c = 0; c < ncb; c++)
+                               out.quickSetValue(o, c + nca, tub[c]);
+               }
+
+               // default case for b and all cases for a.
+               for(int r = 1; r < ra + 1; r++) {
+                       if(filter.containsKey(r)) {
+                               int o = filter.get(r);
+                               for(int c = 0; c < nca; c++)
+                                       out.quickSetValue(o, c, 
ma.quickGetValue(r - 1, c));
+                               for(int c = 0; c < ncb; c++)
+                                       out.quickSetValue(o, c + nca, tub[c]);
+                       }
+               }
+
+               for(int r = ra + 1; r < ra * rb; r++) {
+
+                       if(filter.containsKey(r)) {
+                               int o = filter.get(r);
+
+                               int ia = r % (ra + 1) - 1;
+                               int ib = r / (ra + 1) - 1;
+
+                               if(ia == -1)
+                                       for(int c = 0; c < nca; c++)
+                                               out.quickSetValue(o, c, tua[c]);
+                               else
+                                       for(int c = 0; c < nca; c++)
+                                               out.quickSetValue(o, c, 
ma.quickGetValue(ia, c));
+
+                               for(int c = 0; c < ncb; c++) // all good here.
+                                       out.quickSetValue(o, c + nca, 
mb.quickGetValue(ib, c));
+                       }
+               }
+
+               return new MatrixBlockDictionary(out);
+
+       }
+
        public static ADictionary combineSparseConstSparseRet(ADictionary a, 
int nca, double[] tub) {
                final int ncb = tub.length;
                final int ra = a.getNumberOfValues(nca);
@@ -509,6 +603,33 @@ public interface DictionaryFactory {
 
        }
 
+       private static ADictionary combineSparseConstSparseRet(ADictionary a, 
int nca, double[] tub,
+               Map<Integer, Integer> filter) {
+               if(filter == null)
+                       return combineSparseConstSparseRet(a, nca, tub);
+               else
+                       throw new NotImplementedException();
+               // final int ncb = tub.length;
+               // final int ra = a.getNumberOfValues(nca);
+
+               // MatrixBlock ma = a.getMBDict(nca).getMatrixBlock();
+
+               // MatrixBlock out = new MatrixBlock(ra, nca + ncb, false);
+
+               // out.allocateBlock();
+
+               // // default case for b and all cases for a.
+               // for(int r = 0; r < ra; r++) {
+               // for(int c = 0; c < nca; c++)
+               // out.quickSetValue(r, c, ma.quickGetValue(r, c));
+               // for(int c = 0; c < ncb; c++)
+               // out.quickSetValue(r, c + nca, tub[c]);
+               // }
+
+               // return new MatrixBlockDictionary(out);
+
+       }
+
        public static ADictionary combineConstSparseSparseRet(double[] tua, 
ADictionary b, int ncb) {
                final int nca = tua.length;
                final int rb = b.getNumberOfValues(ncb);
@@ -530,4 +651,31 @@ public interface DictionaryFactory {
                return new MatrixBlockDictionary(out);
 
        }
+
+       private static ADictionary combineConstSparseSparseRet(double[] tua, 
ADictionary b, int ncb,
+               Map<Integer, Integer> filter) {
+               if(filter == null)
+                       return combineConstSparseSparseRet(tua, b, ncb);
+               else
+                       throw new NotImplementedException();
+               // final int nca = tua.length;
+               // final int rb = b.getNumberOfValues(ncb);
+
+               // MatrixBlock mb = b.getMBDict(ncb).getMatrixBlock();
+
+               // MatrixBlock out = new MatrixBlock(rb, nca + ncb, false);
+
+               // out.allocateBlock();
+
+               // // default case for b and all cases for a.
+               // for(int r = 0; r < rb; r++) {
+               // for(int c = 0; c < nca; c++)
+               // out.quickSetValue(r, c, tua[c]);
+               // for(int c = 0; c < ncb; c++)
+               // out.quickSetValue(r, c + nca, mb.quickGetValue(r, c));
+               // }
+
+               // return new MatrixBlockDictionary(out);
+
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/IdentityDictionary.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/IdentityDictionary.java
index 80a21cfd8f..ca740c1a95 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/IdentityDictionary.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/IdentityDictionary.java
@@ -233,7 +233,7 @@ public class IdentityDictionary extends ADictionary {
 
        @Override
        public ADictionary clone() {
-               return new IdentityDictionary(nRowCol);
+               return new IdentityDictionary(nRowCol, withEmpty);
        }
 
        @Override
@@ -357,9 +357,9 @@ public class IdentityDictionary extends ADictionary {
        @Override
        public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int 
previousNumberOfColumns) {
                if(idxStart == 0 && idxEnd == nRowCol)
-                       return new IdentityDictionary(nRowCol);
+                       return new IdentityDictionary(nRowCol, withEmpty);
                else
-                       return new IdentityDictionarySlice(nRowCol, idxStart, 
idxEnd);
+                       return new IdentityDictionarySlice(nRowCol, withEmpty, 
idxStart, idxEnd);
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/IdentityDictionarySlice.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/IdentityDictionarySlice.java
index d25bec9829..6c072a2e13 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/IdentityDictionarySlice.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/IdentityDictionarySlice.java
@@ -42,12 +42,13 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
         * Create a Identity matrix dictionary slice. It behaves as if 
allocated a Sparse Matrix block but exploits that the
         * structure is known to have certain properties.
         * 
-        * @param nRowCol the number of rows and columns in this identity 
matrix.
-        * @param l       the index lower to start at
-        * @param u       the index upper to end at (not inclusive)
+        * @param nRowCol   the number of rows and columns in this identity 
matrix.
+        * @param withEmpty If the matrix should contain an empty row in the 
end.
+        * @param l         the index lower to start at
+        * @param u         the index upper to end at (not inclusive)
         */
-       public IdentityDictionarySlice(int nRowCol, int l, int u) {
-               super(nRowCol);
+       public IdentityDictionarySlice(int nRowCol, boolean withEmpty, int l, 
int u) {
+               super(nRowCol, withEmpty);
                if(u > nRowCol || l < 0 || l >= u)
                        throw new DMLRuntimeException("Invalid slice Identity: 
" + nRowCol + " range: " + l + "--" + u);
                this.l = l;
@@ -68,18 +69,13 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
        @Override
        public double getValue(int i) {
                throw new NotImplementedException();
-               // final int nCol = nRowCol;
-               // final int row = i / nCol;
-               // if(row > nRowCol)
-               // return 0;
-               // final int col = i % nCol;
-               // return row == col ? 1 : 0;
+
        }
 
        @Override
        public final double getValue(int r, int c, int nCol) {
                throw new NotImplementedException();
-               // return r == c ? 1 : 0;
+
        }
 
        @Override
@@ -110,7 +106,7 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
 
        @Override
        public ADictionary clone() {
-               return new IdentityDictionarySlice(nRowCol, l, u);
+               return new IdentityDictionarySlice(nRowCol, withEmpty, l, u);
        }
 
        @Override
@@ -118,11 +114,6 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
                return DictType.IdentitySlice;
        }
 
-       @Override
-       public int getNumberOfValues(int ncol) {
-               return nRowCol;
-       }
-
        @Override
        public double[] sumAllRowsToDouble(int nrColumns) {
                double[] ret = new double[nRowCol];
@@ -200,7 +191,6 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
                return sum(counts, ncol);
        }
 
-
        @Override
        public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int 
previousNumberOfColumns) {
                throw new NotImplementedException("Slice of identity slice ??? 
this is getting a bit ridiculous");
@@ -211,7 +201,6 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
                return pattern == 0.0 || pattern == 1.0;
        }
 
-
        @Override
        public long getNumberNonZeros(int[] counts, int nCol) {
                return (long) sum(counts, nCol);
@@ -229,11 +218,10 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
                return ret;
        }
 
-
        private MatrixBlockDictionary createMBDict() {
                MatrixBlock identity = new MatrixBlock(nRowCol, u - l, true);
                for(int i = l; i < u; i++)
-                       identity.quickSetValue(i, i-l, 1.0);
+                       identity.quickSetValue(i, i - l, 1.0);
                return new MatrixBlockDictionary(identity);
        }
 
@@ -266,7 +254,7 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
 
        @Override
        public long getExactSizeOnDisk() {
-               return 1 + 4 *3;
+               return 1 + 4 * 3;
        }
 
        @Override
@@ -292,9 +280,9 @@ public class IdentityDictionarySlice extends 
IdentityDictionary {
 
        @Override
        public boolean equals(ADictionary o) {
-               if(o instanceof IdentityDictionarySlice){
+               if(o instanceof IdentityDictionarySlice) {
                        IdentityDictionarySlice os = ((IdentityDictionarySlice) 
o);
-                       return os.nRowCol == nRowCol &&  os.l == l && os.u == u;
+                       return os.nRowCol == nRowCol && os.l == l && os.u == u;
                }
                else if(o instanceof IdentityDictionary)
                        return false;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
index 6c41f68259..c8c7a94f21 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
@@ -840,6 +840,19 @@ public abstract class AMapToData implements Serializable {
 
        public abstract AMapToData appendN(IMapToDataGroup[] d);
 
+       @Override
+       public boolean equals(Object e) {
+               return e instanceof AMapToData && (this == e || 
this.equals((AMapToData) e));
+       }
+
+       /**
+        * Indicate if the given encoding is equivalent to this encoding
+        * 
+        * @param e The other encoding to be compared with this
+        * @return If the encoding is equivalent
+        */
+       public abstract boolean equals(AMapToData e);
+
        @Override
        public String toString() {
                final int sz = size();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToBit.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToBit.java
index 55152efdf8..23eccaef80 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToBit.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToBit.java
@@ -35,6 +35,7 @@ public class MapToBit extends AMapToData {
 
        private static final long serialVersionUID = -8065234231282619923L;
 
+       // TODO use custom BitSet
        private final BitSet _data;
        private final int _size;
 
@@ -80,7 +81,7 @@ public class MapToBit extends AMapToData {
 
        @Override
        public long getInMemorySize() {
-               return getInMemorySize(_data.size()-1);
+               return getInMemorySize(_data.size() - 1);
        }
 
        public static long getInMemorySize(int dataLength) {
@@ -326,10 +327,10 @@ public class MapToBit extends AMapToData {
 
        @Override
        public AMapToData slice(int l, int u) {
-               BitSet s = _data.get(l,u);
+               BitSet s = _data.get(l, u);
                if(s.isEmpty())
-                       return new MapToZero(u-l);
-               else 
+                       return new MapToZero(u - l);
+               else
                        return new MapToBit(getUnique(), s, u - l);
        }
 
@@ -351,6 +352,14 @@ public class MapToBit extends AMapToData {
                }
        }
 
+       @Override
+       public boolean equals(AMapToData e) {
+               return e instanceof MapToBit && //
+                       e.getUnique() == getUnique() &&//
+                       ((MapToBit) e)._size == _size && //
+                       ((MapToBit) e)._data.equals(_data);
+       }
+
        @Override
        public AMapToData appendN(IMapToDataGroup[] d) {
                int p = 0; // pointer
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java
index 1a7907e5f8..b2d8623eaf 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java
@@ -255,4 +255,11 @@ public class MapToByte extends AMapToData {
                else
                        return new MapToByte(getUnique(), ret);
        }
+
+       @Override
+       public boolean equals(AMapToData e) {
+               return e instanceof MapToByte && //
+                       e.getUnique() == getUnique() &&//
+                       Arrays.equals(((MapToByte) e)._data, _data);
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToChar.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToChar.java
index c7dd5182a7..3b3f1d5d3d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToChar.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToChar.java
@@ -270,4 +270,11 @@ public class MapToChar extends AMapToData {
 
                return new MapToChar(getUnique(), ret);
        }
+
+       @Override
+       public boolean equals(AMapToData e) {
+               return e instanceof MapToChar && //
+                       e.getUnique() == getUnique() &&//
+                       Arrays.equals(((MapToChar) e)._data, _data);
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToCharPByte.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToCharPByte.java
index e76aafed69..611ed46e00 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToCharPByte.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToCharPByte.java
@@ -235,7 +235,6 @@ public class MapToCharPByte extends AMapToData {
                        byte[] ret_b = Arrays.copyOf(_data_b, newSize);
                        System.arraycopy(tbbb, 0, ret_b, _data_b.length, 
t.size());
 
-
                        return new MapToCharPByte(newDistinct, ret_c, ret_b);
                }
                else {
@@ -244,7 +243,15 @@ public class MapToCharPByte extends AMapToData {
        }
 
        @Override
-       public AMapToData appendN(IMapToDataGroup[] d){
+       public AMapToData appendN(IMapToDataGroup[] d) {
                throw new NotImplementedException();
        }
+
+       @Override
+       public boolean equals(AMapToData e) {
+               return e instanceof MapToCharPByte && //
+                       e.getUnique() == getUnique() &&//
+                       Arrays.equals(((MapToCharPByte) e)._data_b, _data_b) && 
//
+                       Arrays.equals(((MapToCharPByte) e)._data_c, _data_c);
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToInt.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToInt.java
index acad7ee460..d5e83c4a24 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToInt.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToInt.java
@@ -271,4 +271,11 @@ public class MapToInt extends AMapToData {
 
                return new MapToInt(getUnique(), ret);
        }
+
+       @Override
+       public boolean equals(AMapToData e) {
+               return e instanceof MapToInt && //
+                       e.getUnique() == getUnique() &&//
+                       Arrays.equals(((MapToInt) e)._data, _data);
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToZero.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToZero.java
index bdc13b3bac..d15cf09953 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToZero.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToZero.java
@@ -167,4 +167,11 @@ public class MapToZero extends AMapToData {
                        p += gd.getMapToData().size();
                return new MapToZero(p);
        }
+
+       @Override
+       public boolean equals(AMapToData e) {
+               return e instanceof MapToZero && //
+                       e.getUnique() == getUnique() && //
+                       _size == ((MapToZero) e)._size;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/AEncode.java 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/AEncode.java
new file mode 100644
index 0000000000..c4c1943da5
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/AEncode.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.estim.encoding;
+
+public abstract class AEncode implements IEncode {
+
+       @Override
+       public boolean equals(Object e) {
+               return e instanceof IEncode && this.equals((IEncode) e);
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/ConstEncoding.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/ConstEncoding.java
index 1a120772b8..b0fe390f61 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/ConstEncoding.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/ConstEncoding.java
@@ -27,7 +27,7 @@ import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.estim.EstimationFactors;
 
 /** Const encoding for cases where the entire group of columns is the same 
value */
-public class ConstEncoding implements IEncode {
+public class ConstEncoding extends AEncode {
 
        private final int[] counts;
 
@@ -67,4 +67,9 @@ public class ConstEncoding implements IEncode {
        public boolean isDense() {
                return true;
        }
+
+       @Override
+       public boolean equals(IEncode e) {
+               return e instanceof ConstEncoding && ((ConstEncoding) 
e).counts.length == this.counts.length;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
index f68dd3d674..db1905eccc 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
@@ -33,7 +33,7 @@ import 
org.apache.sysds.runtime.compress.estim.EstimationFactors;
 /**
  * An Encoding that contains a value on each row of the input.
  */
-public class DenseEncoding implements IEncode {
+public class DenseEncoding extends AEncode {
 
        private final AMapToData map;
 
@@ -153,8 +153,13 @@ public class DenseEncoding implements IEncode {
        }
 
        private Pair<IEncode, Map<Integer, Integer>> combineDenseNoResize(final 
DenseEncoding other) {
-               if(map == other.map)
-                       return new ImmutablePair<>(this, null); // same object
+               if(map == other.map) {
+                       LOG.warn("Constructing perfect mapping, this could be 
optimized to skip hashmap");
+                       final Map<Integer, Integer> m = new 
HashMap<>(map.size());
+                       for(int i = 0; i < map.getUnique(); i++)
+                               m.put(i * i, i);
+                       return new ImmutablePair<>(this, m); // same object
+               }
 
                final AMapToData lm = map;
                final AMapToData rm = other.map;
@@ -246,6 +251,11 @@ public class DenseEncoding implements IEncode {
                return true;
        }
 
+       @Override
+       public boolean equals(IEncode e) {
+               return e instanceof DenseEncoding && ((DenseEncoding) 
e).map.equals(this.map);
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/EmptyEncoding.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/EmptyEncoding.java
index 9e12654c77..0d386f1424 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/EmptyEncoding.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/EmptyEncoding.java
@@ -29,7 +29,7 @@ import 
org.apache.sysds.runtime.compress.estim.EstimationFactors;
 /**
  * Empty encoding for cases where the entire group of columns is zero
  */
-public class EmptyEncoding implements IEncode {
+public class EmptyEncoding extends AEncode  {
 
        // empty constructor
        public EmptyEncoding() {
@@ -67,4 +67,9 @@ public class EmptyEncoding implements IEncode {
        public boolean isDense() {
                return false;
        }
+
+       @Override
+       public boolean equals(IEncode e) {
+               return e instanceof EmptyEncoding;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java
index e7202a19c4..15393a947b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java
@@ -45,13 +45,13 @@ public interface IEncode {
        public IEncode combine(IEncode e);
 
        /**
-        * Combine two encodings without resizing the output. meaning the 
mapping of the indexes should be consistent with 
+        * Combine two encodings without resizing the output. meaning the 
mapping of the indexes should be consistent with
         * left hand side Dictionary indexes and right hand side indexes.
         * 
-        * @param e The other side to combine with 
+        * @param e The other side to combine with
         * @return The combined encoding
         */
-       public Pair<IEncode, Map<Integer,Integer>> combineWithMap(IEncode e);
+       public Pair<IEncode, Map<Integer, Integer>> combineWithMap(IEncode e);
 
        /**
         * Get the number of unique values in this encoding
@@ -78,4 +78,15 @@ public interface IEncode {
         * @return is dense
         */
        public abstract boolean isDense();
+
+       @Override
+       public abstract boolean equals(Object e);
+
+       /**
+        * Indicate if the given encoding is equivalent to this encoding
+        * 
+        * @param e The other encoding to be compared with this
+        * @return If the encoding is equivalent
+        */
+       public abstract boolean equals(IEncode e);
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/SparseEncoding.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/SparseEncoding.java
index 872e512adf..58ed137d17 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/SparseEncoding.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/SparseEncoding.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysds.runtime.compress.estim.encoding;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -37,7 +38,7 @@ import org.apache.sysds.runtime.compress.utils.IntArrayList;
  * A Encoding that contain a default value that is not encoded and every other 
value is encoded in the map. The logic is
  * similar to the SDC column group.
  */
-public class SparseEncoding implements IEncode {
+public class SparseEncoding extends AEncode {
 
        /** A map to the distinct values contained */
        protected final AMapToData map;
@@ -77,7 +78,7 @@ public class SparseEncoding implements IEncode {
                        SparseEncoding es = (SparseEncoding) e;
                        if(es.off == off && es.map == map)
                                return new ImmutablePair<>(this, null);
-                       return new ImmutablePair<>(combineSparseNoResize(es), 
null);
+                       return combineSparseNoResizeDense(es);
                }
                else
                        throw new DMLCompressionException("Not allowed other to 
be dense");
@@ -121,13 +122,9 @@ public class SparseEncoding implements IEncode {
                }
        }
 
-       private IEncode combineSparseNoResize(SparseEncoding e) {
-               // for now just use the dense... and lets continue.
-               // TODO add sparse combine with sparse output.
-               return combineSparseNoResizeDense(e);
-       }
 
-       private IEncode combineSparseNoResizeDense(SparseEncoding e) {
+
+       private Pair<IEncode, Map<Integer, Integer>>  
combineSparseNoResizeDense(SparseEncoding e) {
 
                final int fl = off.getOffsetToLast();
                final int fr = e.off.getOffsetToLast();
@@ -137,6 +134,7 @@ public class SparseEncoding implements IEncode {
                final int nVr = e.getUnique();
 
                final AMapToData retMap = MapToFactory.create(nRows, (nVl + 1) 
* (nVr + 1));
+               
                int il = itl.value();
                // parse through one side set all values into the dense.
                while(il < fl) {
@@ -155,9 +153,27 @@ public class SparseEncoding implements IEncode {
                }
                retMap.set(fr, retMap.getIndex(fr) + 
(e.map.getIndex(itr.getDataIndex()) + 1) * nVl);
 
-               return new DenseEncoding(retMap);
+               // Full iteration to set unique elements.
+               final Map<Integer, Integer> m = new HashMap<>();
+               for(int i = 0 ; i < retMap.size(); i ++)
+                       addValHashMap(retMap.getIndex(i), i,m, retMap );
+               
+
+               return new ImmutablePair<>(new 
DenseEncoding(retMap.resize(m.size())), m);
+               
+       }
+
+
+       protected static void addValHashMap(final int nv, final int r, final 
Map<Integer, Integer> map, final AMapToData d) {
+               final int v = map.size();
+               final Integer mv = map.putIfAbsent(nv, v);
+               if(mv == null)
+                       d.set(r, v);
+               else
+                       d.set(r, mv);
        }
 
+
        private static int combineSparse(AMapToData lMap, AMapToData rMap, 
AIterator itl, AIterator itr,
                final IntArrayList retOff, final IntArrayList tmpVals, final 
int fl, final int fr, final int nVl, final int nVr,
                final int[] d) {
@@ -394,6 +410,13 @@ public class SparseEncoding implements IEncode {
                return nRows;
        }
 
+       @Override
+       public boolean equals(IEncode e) {
+               return e instanceof SparseEncoding && //
+                       ((SparseEncoding) e).off.equals(this.off) && //
+                       ((SparseEncoding) e).map.equals(this.map);
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombineGroups.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombineGroups.java
index 6c27c90f58..801ef893dc 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombineGroups.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombineGroups.java
@@ -87,7 +87,6 @@ public final class CLALibCombineGroups {
                if(filterFor)
                        input = CLALibUtils.filterFOR(input, c);
 
-
                List<List<AColGroup>> combinations = new ArrayList<>();
                for(CompressedSizeInfoColGroup gi : csi.getInfo()) {
                        combinations.add(findGroupsInIndex(gi.getColumns(), 
input));
@@ -98,10 +97,8 @@ public final class CLALibCombineGroups {
                        for(List<AColGroup> combine : combinations)
                                ret.add(combineN(combine).addVector(c));
                else
-                       for(List<AColGroup> combine : combinations) 
+                       for(List<AColGroup> combine : combinations)
                                ret.add(combineN(combine));
-               
-
 
                return ret;
        }
@@ -138,26 +135,37 @@ public final class CLALibCombineGroups {
         * @return A new column group containing the two.
         */
        public static AColGroup combine(AColGroup a, AColGroup b) {
-               if(a instanceof IFrameOfReferenceGroup || b instanceof 
IFrameOfReferenceGroup)
-                       throw new DMLCompressionException("Invalid call with 
frame of reference group to combine");
+               try {
 
-               IColIndex combinedColumns = ColIndexFactory.combine(a, b);
+                       if(a instanceof IFrameOfReferenceGroup || b instanceof 
IFrameOfReferenceGroup)
+                               throw new DMLCompressionException("Invalid call 
with frame of reference group to combine");
 
-               // try to recompress a and b if uncompressed
-               if(a instanceof ColGroupUncompressed)
-                       a = a.recompress();
+                       IColIndex combinedColumns = ColIndexFactory.combine(a, 
b);
 
-               if(b instanceof ColGroupUncompressed)
-                       b = b.recompress();
+                       // try to recompress a and b if uncompressed
+                       if(a instanceof ColGroupUncompressed)
+                               a = a.recompress();
 
-               if(a instanceof AColGroupCompressed && b instanceof 
AColGroupCompressed)
-                       return combineCompressed(combinedColumns, 
(AColGroupCompressed) a, (AColGroupCompressed) b);
-               else if(a instanceof ColGroupUncompressed || b instanceof 
ColGroupUncompressed)
-                       // either side is uncompressed
-                       return combineUC(combinedColumns, a, b);
+                       if(b instanceof ColGroupUncompressed)
+                               b = b.recompress();
 
-               throw new NotImplementedException(
-                       "Not implemented combine for " + 
a.getClass().getSimpleName() + " - " + b.getClass().getSimpleName());
+                       if(a instanceof AColGroupCompressed && b instanceof 
AColGroupCompressed)
+                               return combineCompressed(combinedColumns, 
(AColGroupCompressed) a, (AColGroupCompressed) b);
+                       else if(a instanceof ColGroupUncompressed || b 
instanceof ColGroupUncompressed)
+                               // either side is uncompressed
+                               return combineUC(combinedColumns, a, b);
+
+                       throw new NotImplementedException(
+                               "Not implemented combine for " + 
a.getClass().getSimpleName() + " - " + b.getClass().getSimpleName());
+               }
+               catch(Exception e) {
+                       StringBuilder sb = new StringBuilder();
+                       sb.append("Failed to combine:\n\n");
+                       sb.append(a);
+                       sb.append("\n\n");
+                       sb.append(b);
+                       throw new DMLCompressionException(sb.toString(), e);
+               }
 
        }
 
@@ -169,10 +177,11 @@ public final class CLALibCombineGroups {
                        // the order must be sparse second unless both sparse.
                        return combineCompressed(combinedColumns, bc, ac);
                }
+               // add if encodings are equal make shortcut.
 
-               Pair<IEncode, Map<Integer,Integer>> cec = ae.combineWithMap(be);
+               Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
                IEncode ce = cec.getLeft();
-               Map<Integer,Integer> filter = cec.getRight();
+               Map<Integer, Integer> filter = cec.getRight();
                if(ce instanceof DenseEncoding) {
                        DenseEncoding ced = (DenseEncoding) (ce);
                        ADictionary cd = 
DictionaryFactory.combineDictionaries(ac, bc, filter);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
index f46665e60d..35656f2ea2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java
@@ -152,7 +152,7 @@ public final class CLALibDecompress {
                        return ret; // if uncompressedColGroup is only colGroup.
                }
 
-               final boolean shouldFilter = 
CLALibUtils.shouldPreFilter(groups);
+               final boolean shouldFilter = 
CLALibUtils.shouldPreFilterMorphOrRef(groups);
                double[] constV = shouldFilter ? new double[nCols] : null;
                final List<AColGroup> filteredGroups = shouldFilter ? 
CLALibUtils.filterGroups(groups, constV) : groups;
 
@@ -164,6 +164,10 @@ public final class CLALibDecompress {
                                ret.allocateSparseRowsBlock();
                        else
                                ret.allocateDenseBlock();
+
+                       if(MatrixBlock.evalSparseFormatInMemory(nRows, nCols, 
nonZeros) && !sparse)
+                               LOG.warn("Decompressing into dense but 
reallocating after to sparse: overlapping - " + overlapping
+                                       + ", filter - " + shouldFilter);
                }
 
                final int blklen = Math.max(nRows / k, 512);
@@ -188,9 +192,10 @@ public final class CLALibDecompress {
                else {
                        decompressDenseMultiThread(ret, filteredGroups, nRows, 
blklen, constV, eps, k, overlapping);
                }
-               ret.recomputeNonZeros();
 
+               ret.recomputeNonZeros();
                ret.examSparsity();
+
                return ret;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSlice.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSlice.java
index 3c1c1b64bb..a99142ec00 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSlice.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSlice.java
@@ -78,8 +78,6 @@ public final class CLALibSlice {
        }
 
        private static List<MatrixBlock> 
sliceBlocksMultiThread(CompressedMatrixBlock cmb, int blen, int k) {
-               // final List<MatrixBlock> mbs = new ArrayList<>();
-
                final ExecutorService pool = CommonThreadPool.get(k);
                try {
                        final ArrayList<SliceTask> tasks = new ArrayList<>();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibUtils.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibUtils.java
index eba4bcc678..292df246e8 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibUtils.java
@@ -94,6 +94,19 @@ public final class CLALibUtils {
                return false;
        }
 
+       /**
+        * Helper method to determine if the column groups contains Morphing or 
Frame of reference groups.
+        * 
+        * @param groups The groups to analyze
+        * @return A Boolean saying there is morphing or FOR groups.
+        */
+       protected static boolean shouldPreFilterMorphOrRef(List<AColGroup> 
groups) {
+               for(AColGroup g : groups)
+                       if(g instanceof AMorphingMMColGroup || g instanceof 
IFrameOfReferenceGroup)
+                               return true;
+               return false;
+       }
+
        /**
         * Detect if the list of groups contains FOR.
         * 
@@ -132,6 +145,8 @@ public final class CLALibUtils {
                for(AColGroup g : groups) {
                        if(g instanceof ColGroupEmpty || g.isEmpty())
                                continue;
+                       else if(g instanceof IFrameOfReferenceGroup)
+                               
filteredGroups.add(((IFrameOfReferenceGroup)g).extractCommon(constV));
                        else if(g instanceof AMorphingMMColGroup)
                                filteredGroups.add(((AMorphingMMColGroup) 
g).extractCommon(constV));
                        else if(g instanceof ColGroupConst)
diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
index b656efadc5..ed5d48d6b3 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
@@ -857,7 +857,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>, 
Externalizable {
                double size = 0;
                if(_coldata == null) // not allocated estimate if allocated
                        for(int j = 0; j < clen; j++)
-                               size += 
ArrayFactory.getInMemorySize(_schema[j], rlen);
+                               size += 
ArrayFactory.getInMemorySize(_schema[j], rlen, true);
                else {// allocated
                        if(rlen > 1000 && clen > 10 && 
ConfigurationManager.isParallelIOEnabled()) {
                                final ExecutorService pool = 
CommonThreadPool.get(InfrastructureAnalyzer.getLocalParallelism());
@@ -1196,12 +1196,14 @@ public class FrameBlock implements 
CacheBlock<FrameBlock>, Externalizable {
 
        /**
         * Copy src matrix into the index range of the existing current matrix.
+        *
+        * This is used to copy smaller blocks into a larger block, for 
instance in binary reading.
         * 
         * @param rl  row start
         * @param ru  row end inclusive
         * @param cl  col start
         * @param cu  col end inclusive
-        * @param src source FrameBlock
+        * @param src source FrameBlock typically a smaller block.
         */
        public void copy(int rl, int ru, int cl, int cu, FrameBlock src) {
                // If full copy, fall back to default copy
@@ -1210,9 +1212,10 @@ public class FrameBlock implements 
CacheBlock<FrameBlock>, Externalizable {
                        return;
                }
                ensureAllocateMeta();
-               if(_coldata == null)
+               if(_coldata == null) // allocate column data.
                        _coldata = new Array[_schema.length];
-               synchronized(this) {
+               synchronized(this) { // make sync locks
+                       // TODO remove sync locks on array types where they are 
not needed.
                        if(_columnLocks == null) {
                                Object[] locks = new Object[_schema.length];
                                for(int i = 0; i < locks.length; i++)
@@ -1221,7 +1224,7 @@ public class FrameBlock implements 
CacheBlock<FrameBlock>, Externalizable {
                        }
                }
                Object[] locks = _columnLocks.get();
-               for(int j = cl; j <= cu; j++) {
+               for(int j = cl; j <= cu; j++) { // for each column
                        synchronized(locks[j]) { // synchronize on the column.
                                _coldata[j] = ArrayFactory.set(_coldata[j], 
src._coldata[j - cl], rl, ru, _nRow);
                        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
index 90ceb5f6a2..a04fae7a2b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
@@ -60,8 +60,20 @@ public abstract class ACompressedArray<T> extends Array<T> {
 
        @Override
        public void set(int rl, int ru, Array<T> value) {
-               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
-       }
+               if(value instanceof DDCArray)
+                       set(rl, ru, (DDCArray<T>) value);
+               else
+                       throw new DMLCompressionException("Invalid to set value 
in CompressedArray");
+       }
+
+       /**
+        * Set the range given.
+        * 
+        * @param rl    row lower
+        * @param ru    row upper (inclusive)
+        * @param value The array to take from
+        */
+       protected abstract void set(int rl, int ru, DDCArray<T> value);
 
        @Override
        public void set(int rl, int ru, Array<T> value, int rlSrc) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
index da9fc883f6..f57e303a49 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
@@ -606,6 +606,19 @@ public abstract class Array<T> implements Writable {
                return new ArrayIterator();
        }
 
+       @Override
+       @SuppressWarnings("unchecked")
+       public boolean equals(Object other) {
+               try {
+                       return other instanceof Array && this.equals((Array<T>) 
other);
+               }
+               catch(ClassCastException e) {
+                       return false;
+               }
+       }
+
+       public abstract boolean equals(Array<T> other);
+
        public ArrayCompressionStatistics statistics(int nSamples) {
 
                Map<T, Integer> d = new HashMap<>();
@@ -631,7 +644,7 @@ public abstract class Array<T> implements Writable {
 
                if(ddcSize < memSize)
                        return new 
ArrayCompressionStatistics(memSizePerElement, //
-                               estDistinct, true, FrameArrayType.DDC, memSize, 
ddcSize);
+                               estDistinct, true, 
getValueType(),FrameArrayType.DDC, memSize, ddcSize);
 
                return null;
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java
index 1b39d5981b..5291badcea 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java
@@ -23,11 +23,14 @@ import java.io.DataInput;
 import java.io.IOException;
 import java.util.BitSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.utils.MemoryEstimates;
 
 public interface ArrayFactory {
+       public static final Log LOG = 
LogFactory.getLog(ArrayFactory.class.getName());
 
        public final static int bitSetSwitchPoint = 64;
 
@@ -71,31 +74,54 @@ public interface ArrayFactory {
                return new OptionalArray<>(col);
        }
 
-       public static long getInMemorySize(ValueType type, int _numRows) {
-               switch(type) {
-                       case BOOLEAN:
-                               if(_numRows > bitSetSwitchPoint)
-                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.longArrayCost(_numRows >> 6 + 1);
-                               else
-                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.booleanArrayCost(_numRows);
-                       case INT64:
-                               return Array.baseMemoryCost() + (long) 
MemoryEstimates.longArrayCost(_numRows);
-                       case FP64:
-                               return Array.baseMemoryCost() + (long) 
MemoryEstimates.doubleArrayCost(_numRows);
-                       case UINT4:
-                       case UINT8:
-                       case INT32:
-                               return Array.baseMemoryCost() + (long) 
MemoryEstimates.intArrayCost(_numRows);
-                       case FP32:
-                               return Array.baseMemoryCost() + (long) 
MemoryEstimates.floatArrayCost(_numRows);
-                       case STRING:
-                               // cannot be known since strings have dynamic 
length
-                               // lets assume something large to make it 
somewhat safe.
-                               return Array.baseMemoryCost() + 
MemoryEstimates.stringCost(12) * _numRows;
-                       case CHARACTER:
-                               return Array.baseMemoryCost() + (long) 
MemoryEstimates.charArrayCost(_numRows);
-                       default: // not applicable
-                               throw new DMLRuntimeException("Invalid type to 
estimate size of :" + type);
+       public static long getInMemorySize(ValueType type, int _numRows, 
boolean containsNull) {
+               if(containsNull) {
+                       switch(type) {
+                               case BOOLEAN:
+                               case INT64:
+                               case FP64:
+                               case UINT4:
+                               case UINT8:
+                               case INT32:
+                               case FP32:
+                               case CHARACTER:
+                                       return getInMemorySize(type, _numRows, 
false) + // NotNull Array
+                                               
getInMemorySize(ValueType.BOOLEAN, _numRows, false) // BitSet
+                                               + 16 + Array.baseMemoryCost(); 
// Optional Overhead
+                               case STRING:
+                                       // cannot be known since strings have 
dynamic length
+                                       // lets assume something large to make 
it somewhat safe.
+                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.stringCost(12) * _numRows;
+                               default: // not applicable
+                                       throw new DMLRuntimeException("Invalid 
type to estimate size of :" + type);
+                       }
+               }
+               else {
+                       switch(type) {
+                               case BOOLEAN:
+                                       if(_numRows > bitSetSwitchPoint)
+                                               return 
BitSetArray.estimateInMemorySize(_numRows);
+                                       else
+                                               return 
BooleanArray.estimateInMemorySize(_numRows);
+                               case INT64:
+                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.longArrayCost(_numRows);
+                               case FP64:
+                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.doubleArrayCost(_numRows);
+                               case UINT4:
+                               case UINT8:
+                               case INT32:
+                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.intArrayCost(_numRows);
+                               case FP32:
+                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.floatArrayCost(_numRows);
+                               case STRING:
+                                       // cannot be known since strings have 
dynamic length
+                                       // lets assume something large to make 
it somewhat safe.
+                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.stringCost(12) * _numRows;
+                               case CHARACTER:
+                                       return Array.baseMemoryCost() + (long) 
MemoryEstimates.charArrayCost(_numRows);
+                               default: // not applicable
+                                       throw new DMLRuntimeException("Invalid 
type to estimate size of :" + type);
+                       }
                }
        }
 
@@ -131,6 +157,10 @@ public interface ArrayFactory {
                }
        }
 
+       public static <T> DDCArray<T> allocateDDC(DDCArray<T> start, int nRow) {
+               return start.allocateLarger(nRow);
+       }
+
        public static ABooleanArray allocateBoolean(int nRow) {
                if(nRow > bitSetSwitchPoint)
                        return new BitSetArray(nRow);
@@ -234,25 +264,42 @@ public interface ArrayFactory {
         */
        @SuppressWarnings("unchecked")
        public static <C> Array<C> set(Array<?> target, Array<?> src, int rl, 
int ru, int rlen) {
-               if(target == null) {
-                       if(src.getFrameArrayType() == FrameArrayType.OPTIONAL)
-                               target = allocateOptional(src.getValueType(), 
rlen);
-                       else
-                               target = allocate(src.getValueType(), rlen);
-               }
-               else if(target.getFrameArrayType() != FrameArrayType.OPTIONAL //
-                       && src.getFrameArrayType() == FrameArrayType.OPTIONAL) {
-                       target = new OptionalArray<>(target, false);
-               }
+               try {
 
-               final ValueType ta = target.getValueType();
-               final ValueType tb = src.getValueType();
-               final ValueType tc = ValueType.getHighestCommonType(ta, tb);
+                       if(target == null) {
+
+                               if(src.getFrameArrayType() == 
FrameArrayType.OPTIONAL)
+                                       target = 
allocateOptional(src.getValueType(), rlen);
+                               else if(src.getFrameArrayType() == 
FrameArrayType.DDC)
+                                       target = allocateDDC((DDCArray<?>) src, 
rlen);
+                               else
+                                       target = allocate(src.getValueType(), 
rlen);
 
-               Array<C> targetC = (Array<C>) (ta != tc ? target.changeType(tc) 
: target);
-               Array<C> srcC = (Array<C>) (tb != tc ? src.changeType(tc) : 
src);
-               targetC.set(rl, ru, srcC);
-               return targetC;
+                               if(rlen == ru)
+                                       throw new DMLRuntimeException("Invalid 
length to set");
+                       }
+                       else if(target.getFrameArrayType() != 
FrameArrayType.OPTIONAL //
+                               && src.getFrameArrayType() == 
FrameArrayType.OPTIONAL) {
+                               target = new OptionalArray<>(target, false);
+                       }
+
+                       if(target.size() < rlen) {
+                               throw new DMLRuntimeException("Invalid 
allocated target is not large enough");
+                       }
+
+                       final ValueType ta = target.getValueType();
+                       final ValueType tb = src.getValueType();
+                       final ValueType tc = ValueType.getHighestCommonType(ta, 
tb);
+
+                       Array<C> targetC = (Array<C>) (ta != tc ? 
target.changeType(tc) : target);
+                       Array<C> srcC = (Array<C>) (tb != tc ? 
src.changeType(tc) : src);
+                       targetC.set(rl, ru, srcC);
+                       return targetC;
+               }
+               catch(Exception e) {
+                       throw new DMLRuntimeException(
+                               "Failed to set subpart with: \n\n" + target + 
"\n\n" + src + " \n\n " + rl + " " + ru + " " + rlen, e);
+               }
 
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
index 6ef4cb1cd2..a331cbc978 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
@@ -550,6 +550,15 @@ public class BitSetArray extends ABooleanArray {
                return null;
        }
 
+
+       @Override
+       public boolean equals(Array<Boolean> other){
+               if(other instanceof BitSetArray)
+                       return Arrays.equals(_data, ((BitSetArray)other)._data);
+               else 
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_size + 10);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java
index 75809bda46..22f9505655 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java
@@ -350,6 +350,14 @@ public class BooleanArray extends ABooleanArray {
                return null;
        }
 
+       @Override
+       public boolean equals(Array<Boolean> other) {
+               if(other instanceof BooleanArray)
+                       return Arrays.equals(_data, ((BooleanArray) 
other)._data);
+               else
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_data.length * 2 + 10);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java
index b8953edef6..be7044e907 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java
@@ -327,6 +327,14 @@ public class CharArray extends Array<Character> {
                return Character.hashCode(_data[idx]);
        }
 
+       @Override
+       public boolean equals(Array<Character> other){
+               if(other instanceof CharArray)
+                       return Arrays.equals(_data, ((CharArray)other)._data);
+               else 
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_data.length * 2 + 15);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java
index 445a8f7586..c362c6e800 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java
@@ -175,6 +175,17 @@ public class DDCArray<T> extends ACompressedArray<T> {
                return dict.analyzeValueType();
        }
 
+       @Override
+       protected void set(int rl, int ru, DDCArray<T> value) {
+               if(value.dict.size() != dict.size() || (FrameBlock.debug && 
!value.dict.equals(dict)))
+                       throw new DMLCompressionException("Invalid setting of 
DDC Array, of incompatible instance.");
+
+               final AMapToData tm = value.map;
+               for(int i = rl; i <= ru; i++) {
+                       map.set(i, tm.getIndex(i-rl));
+               }
+       }
+
        @Override
        public FrameArrayType getFrameArrayType() {
                return FrameArrayType.DDC;
@@ -284,6 +295,26 @@ public class DDCArray<T> extends ACompressedArray<T> {
                return (estDistinct * memSizeBitPerElement) / 8 + 
MapToFactory.estimateInMemorySize(nRow, estDistinct);
        }
 
+       protected DDCArray<T> allocateLarger(int nRow) {
+               final AMapToData m = MapToFactory.create(nRow, map.getUnique());
+               return new DDCArray<>(dict, m);
+       }
+
+       @Override
+       public boolean containsNull() {
+               return dict.containsNull();
+       }
+
+       @Override
+       public boolean equals(Array<T> other) {
+               if(other instanceof DDCArray) {
+                       DDCArray<T> ot = (DDCArray<T>) other;
+                       return dict.equals(ot.dict) && map.equals(ot.map);
+               }
+               else
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/DoubleArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DoubleArray.java
index 5991fc24ab..6ac63cba8c 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/DoubleArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DoubleArray.java
@@ -374,12 +374,19 @@ public class DoubleArray extends Array<Double> {
                return _data[i] != 0.0d;
        }
 
-
        @Override
-       public double hashDouble(int idx){
+       public double hashDouble(int idx) {
                return Double.hashCode(_data[idx]);
        }
 
+       @Override
+       public boolean equals(Array<Double> other) {
+               if(other instanceof DoubleArray)
+                       return Arrays.equals(_data, ((DoubleArray) 
other)._data);
+               else
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_data.length * 5 + 2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
index 9e6f0748a0..5f463bb0ce 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/FloatArray.java
@@ -326,12 +326,19 @@ public class FloatArray extends Array<Float> {
                return _data[i] != 0.0f;
        }
 
-
        @Override
-       public double hashDouble(int idx){
+       public double hashDouble(int idx) {
                return Float.hashCode(_data[idx]);
        }
 
+       @Override
+       public boolean equals(Array<Float> other) {
+               if(other instanceof FloatArray)
+                       return Arrays.equals(_data, ((FloatArray) other)._data);
+               else
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_data.length * 5 + 2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/IntegerArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/IntegerArray.java
index e7fad8b77b..2c6d3e80f4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/IntegerArray.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/IntegerArray.java
@@ -331,12 +331,19 @@ public class IntegerArray extends Array<Integer> {
                return _data[i] != 0;
        }
 
-
        @Override
-       public double hashDouble(int idx){
+       public double hashDouble(int idx) {
                return Integer.hashCode(_data[idx]);
        }
 
+       @Override
+       public boolean equals(Array<Integer> other) {
+               if(other instanceof IntegerArray)
+                       return Arrays.equals(_data, ((IntegerArray) 
other)._data);
+               else
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_data.length * 5 + 2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/LongArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/LongArray.java
index 933c0039a3..02fa0386f6 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/LongArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/LongArray.java
@@ -333,12 +333,19 @@ public class LongArray extends Array<Long> {
                return _data[i] != 0;
        }
 
-
        @Override
-       public double hashDouble(int idx){
+       public double hashDouble(int idx) {
                return Long.hashCode(_data[idx]);
        }
 
+       @Override
+       public boolean equals(Array<Long> other) {
+               if(other instanceof LongArray)
+                       return Arrays.equals(_data, ((LongArray) other)._data);
+               else
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_data.length * 5 + 2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/OptionalArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/OptionalArray.java
index 56e66ba444..2405c22756 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/OptionalArray.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/OptionalArray.java
@@ -99,6 +99,15 @@ public class OptionalArray<T> extends Array<T> {
                return 1L + _a.getExactSerializedSize() + 
_n.getExactSerializedSize();
        }
 
+       @Override
+       public long getInMemorySize() {
+               long size = super.getInMemorySize(); // object header + object 
reference
+               size += 16; // object pointers.
+               size += _a.getInMemorySize();
+               size += _n.getInMemorySize();
+               return size;
+       }
+
        @Override
        public void readFields(DataInput in) throws IOException {
                throw new DMLRuntimeException("Should not be called");
@@ -200,7 +209,7 @@ public class OptionalArray<T> extends Array<T> {
                Array<Boolean> nulls = value.getNulls();
                if(nulls != null)
                        _n.set(rl, ru, nulls, rlSrc);
-               else{
+               else {
                        for(int i = rl; i <= ru; i++)
                                _n.set(i, true);
                }
@@ -422,27 +431,35 @@ public class OptionalArray<T> extends Array<T> {
                        default:
                                return changeTypeString(); // String can 
contain null
                }
-
        }
 
        @Override
-       public boolean containsNull(){
+       public boolean containsNull() {
                return !_n.isAllTrue();
        }
 
-
        @Override
-       public double hashDouble(int idx){
+       public double hashDouble(int idx) {
                if(_n.get(idx))
                        return _a.hashDouble(idx);
                else
                        return Double.NaN;
        }
 
+       @Override
+       public boolean equals(Array<T> other) {
+               if(other instanceof OptionalArray) {
+                       OptionalArray<T> ot = (OptionalArray<T>) other;
+                       return _n.equals(ot._n) && ot._a.equals(_a);
+               }
+               else
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_size + 2);
-               sb.append(super.toString() + "<" + 
_a.getClass().getSimpleName() + ">:[");
+               sb.append(super.toString() + "<" + _a.getValueType() + ">:[");
                for(int i = 0; i < _size - 1; i++)
                        sb.append(get(i) + ",");
                sb.append(get(_size - 1));
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/RaggedArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/RaggedArray.java
index 5010a27a6b..b8f49679b2 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/RaggedArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/RaggedArray.java
@@ -252,4 +252,9 @@ public class RaggedArray<T> extends Array<T> {
                throw new NotImplementedException("Unimplemented method 
'hashDouble'");
        }
 
+       @Override
+       public boolean equals(Array<T> other) {
+               throw new NotImplementedException("Unimplemented method 
'equals'");
+       }
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java
index 115a7103ce..8eddc37707 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java
@@ -669,6 +669,14 @@ public class StringArray extends Array<String> {
                        return Double.NaN;
        }
 
+       @Override
+       public boolean equals(Array<String> other) {
+               if(other instanceof StringArray)
+                       return Arrays.equals(_data, ((StringArray) 
other)._data);
+               else
+                       return false;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_size * 5 + 2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/compress/ArrayCompressionStatistics.java
 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/ArrayCompressionStatistics.java
index ae41655482..ec98e9847a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/compress/ArrayCompressionStatistics.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/ArrayCompressionStatistics.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysds.runtime.frame.data.compress;
 
+import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
 
 public class ArrayCompressionStatistics {
@@ -26,15 +27,17 @@ public class ArrayCompressionStatistics {
        public final long originalSize;
        public final long compressedSizeEstimate;
        public final boolean shouldCompress;
+       public final ValueType valueType;
        public final FrameArrayType bestType;
        public final int bitPerValue;
        public final int nUnique;
 
-       public ArrayCompressionStatistics(int bitPerValue, int nUnique, boolean 
shouldCompress, FrameArrayType bestType,
-               long originalSize, long compressedSizeEstimate) {
+       public ArrayCompressionStatistics(int bitPerValue, int nUnique, boolean 
shouldCompress, ValueType valueType,
+               FrameArrayType bestType, long originalSize, long 
compressedSizeEstimate) {
                this.bitPerValue = bitPerValue;
                this.nUnique = nUnique;
                this.shouldCompress = shouldCompress;
+               this.valueType = valueType;
                this.bestType = bestType;
                this.originalSize = originalSize;
                this.compressedSizeEstimate = compressedSizeEstimate;
@@ -43,8 +46,8 @@ public class ArrayCompressionStatistics {
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
-               sb.append(String.format("Compressed Stats: size:%6d->%6d, 
Use:%10s, Unique:%5d", originalSize,
-                       compressedSizeEstimate, bestType.toString(), nUnique));
+               sb.append(String.format("Compressed Stats: size:%8d->%8d, 
Use:%10s, Unique:%6d, ValueType:%7s", originalSize,
+                       compressedSizeEstimate, bestType.toString(), nUnique, 
valueType));
                return sb.toString();
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/compress/CompressedFrameBlockFactory.java
 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/CompressedFrameBlockFactory.java
index b3246863c9..90f24b527c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/compress/CompressedFrameBlockFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/CompressedFrameBlockFactory.java
@@ -99,9 +99,10 @@ public class CompressedFrameBlockFactory {
                if(LOG.isDebugEnabled()) {
                        for(int i = 0; i < compressedColumns.length; i++) {
                                if(stats[i] != null)
-                                       LOG.debug(stats[i]);
+                                       LOG.debug(String.format("Col: %3d, %s", 
i, stats[i]));
                                else
-                                       LOG.debug("no Comp col: " + i);
+                                       LOG.debug(
+                                               String.format("Col: %3d, No 
Compress, Type: %s", i, in.getColumn(i).getClass().getSimpleName()));
                        }
                }
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameReaderBinaryBlock.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderBinaryBlock.java
index 2627efa215..7625737d9f 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderBinaryBlock.java
@@ -32,93 +32,88 @@ import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
 
-
 /**
  * Single-threaded frame binary block reader.
  * 
  */
-public class FrameReaderBinaryBlock extends FrameReader{
-       
+public class FrameReaderBinaryBlock extends FrameReader {
+
        // private static final Log LOG = 
LogFactory.getLog(FrameReaderBinaryBlock.class.getName());
-       
+
        @Override
-       public final FrameBlock readFrameFromHDFS(String fname, ValueType[] 
schema, String[] names, long rlen, long clen) 
-               throws IOException, DMLRuntimeException 
-       {
-               //allocate output frame block
+       public final FrameBlock readFrameFromHDFS(String fname, ValueType[] 
schema, String[] names, long rlen, long clen)
+               throws IOException, DMLRuntimeException {
+               // allocate output frame block
                ValueType[] lschema = createOutputSchema(schema, clen);
                String[] lnames = createOutputNames(names, clen);
-               FrameBlock ret = new FrameBlock(lschema, lnames, (int)rlen);
-               
-               //prepare file access
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               Path path = new Path( fname ); 
+               FrameBlock ret = new FrameBlock(lschema, lnames, (int) rlen);
+
+               // prepare file access
+               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               Path path = new Path(fname);
                FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               
-               //check existence and non-empty file
-               checkValidInputFile(fs, path); 
-       
-               //core read (sequential/parallel)
+
+               // check existence and non-empty file
+               checkValidInputFile(fs, path);
+
+               // core read (sequential/parallel)
                readBinaryBlockFrameFromHDFS(path, job, fs, ret, rlen, clen);
                return ret;
        }
-       
+
        @Override
        public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] 
schema, String[] names, long rlen, long clen)
-               throws IOException, DMLRuntimeException 
-       {
+               throws IOException, DMLRuntimeException {
                throw new DMLRuntimeException("Not implemented yet.");
        }
 
-       protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, 
FileSystem fs, FrameBlock dest, long rlen, long clen )
-               throws IOException, DMLRuntimeException
-       {
-               //sequential read from sequence files
-               for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path) ) //1..N files 
+       protected void readBinaryBlockFrameFromHDFS(Path path, JobConf job, 
FileSystem fs, FrameBlock dest, long rlen,
+               long clen) throws IOException, DMLRuntimeException {
+               // sequential read from sequence files
+               for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path)) // 1..N files
                        readBinaryBlockFrameFromSequenceFile(lpath, job, fs, 
dest);
        }
 
-       @SuppressWarnings({ "deprecation" })
-       protected static void readBinaryBlockFrameFromSequenceFile( Path path, 
JobConf job, FileSystem fs, FrameBlock dest )
-               throws IOException, DMLRuntimeException
-       {
-               int rlen = dest.getNumRows();
-               int clen = dest.getNumColumns();
-               
-               //directly read from sequence files (individual partfiles)
-               SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,path,job);
-               LongWritable key = new LongWritable(-1L);
-               FrameBlock value = new FrameBlock();
-               
-               try
-               {
-                       while( reader.next(key, value) ) {
-                               int row_offset = (int)(key.get()-1);
-                               int rows = value.getNumRows();
-                               int cols = value.getNumColumns();
-                               
-                               if(rows == 0 || cols == 0)      //Empty block, 
ignore it.
+       protected static void readBinaryBlockFrameFromSequenceFile(Path path, 
JobConf job, FileSystem fs, FrameBlock dest)
+               throws IOException, DMLRuntimeException {
+               final int rlen = dest.getNumRows();
+               final int clen = dest.getNumColumns();
+
+               SequenceFile.Reader reader = new SequenceFile.Reader(job, 
SequenceFile.Reader.file(path));
+               LongWritable key = new LongWritable(-1L); // row block
+               FrameBlock value = new FrameBlock(); // contained values.
+
+               try {
+                       while(reader.next(key, value)) {
+                               final int row_offset = (int) (key.get() - 1);
+                               final int rows = value.getNumRows();
+                               final int cols = value.getNumColumns();
+
+                               if(rows == 0 || cols == 0) // Empty block, 
ignore it.
                                        continue;
-                               
-                               //bound check per block
-                               if( row_offset + rows < 0 || row_offset + rows 
> rlen ) {
-                                       throw new IOException("Frame block 
["+(row_offset+1)+":"+(row_offset+rows)+","+":"+"] " +
-                                                                     "out of 
overall frame range [1:"+rlen+",1:"+clen+"].");
+
+                               // bound check per block
+                               if(row_offset + rows < 0 || row_offset + rows > 
rlen) {
+                                       throw new IOException("Frame block [" + 
(row_offset + 1) + ":" + (row_offset + rows) + "," + ":" + "] "
+                                               + "out of overall frame range 
[1:" + rlen + ",1:" + clen + "].");
                                }
-               
-                               //copy block into target frame, incl meta on 
first
-                               dest.copy( row_offset, row_offset+rows-1, 0, 
cols-1, value);
-                               if( row_offset==0 ) {
+
+                               // copy block into target frame, incl meta on 
first
+                               dest.copy(row_offset, row_offset + rows - 1, 0, 
cols - 1, value);
+                               if(row_offset == 0) {
                                        
dest.setColumnNames(value.getColumnNames());
                                        
dest.setColumnMetadata(value.getColumnMetadata());
                                }
                        }
                }
+               catch(ArrayIndexOutOfBoundsException e) {
+                       throw new IOException("Failed while reading block: " + 
(key.get() - 1), e);
+               }
                finally {
                        IOUtilFunctions.closeSilently(reader);
                }
-       }       
-       
+       }
+
        /**
         * Specific functionality of FrameReaderBinaryBlock, mostly used for 
testing.
         * 
@@ -127,26 +122,25 @@ public class FrameReaderBinaryBlock extends FrameReader{
         * @throws IOException if IOException occurs
         */
        @SuppressWarnings("deprecation")
-       public FrameBlock readFirstBlock(String fname) throws IOException 
-       {
-               //prepare file access
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               Path path = new Path( fname ); 
+       public FrameBlock readFirstBlock(String fname) throws IOException {
+               // prepare file access
+               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               Path path = new Path(fname);
                FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               
+
                LongWritable key = new LongWritable();
                FrameBlock value = new FrameBlock();
-               
-               //read first block from first file
-               Path lpath = IOUtilFunctions.getSequenceFilePaths(fs, path)[0]; 
 
-               SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);             
+
+               // read first block from first file
+               Path lpath = IOUtilFunctions.getSequenceFilePaths(fs, path)[0];
+               SequenceFile.Reader reader = new SequenceFile.Reader(fs, lpath, 
job);
                try {
                        reader.next(key, value);
                }
                finally {
                        IOUtilFunctions.closeSilently(reader);
                }
-               
+
                return value;
        }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java 
b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
index 82af28c0bf..bb0b0c940f 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
@@ -19,14 +19,17 @@
 
 package org.apache.sysds.runtime.io;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.io.WriterCompressed;
 
-public class MatrixWriterFactory
-{
+public class MatrixWriterFactory{
+
+       protected static final Log LOG = 
LogFactory.getLog(MatrixWriterFactory.class.getName());
 
        public static MatrixWriter createMatrixWriter(FileFormat fmt) {
                return createMatrixWriter(fmt, -1, null);
@@ -74,8 +77,10 @@ public class MatrixWriterFactory
                        case BINARY:
                                if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS)
 )
                                        writer = new 
WriterBinaryBlockParallel(replication);
-                               else
+                               else{
+                                       LOG.warn("Using single threaded binary 
writer");
                                        writer = new 
WriterBinaryBlock(replication);
+                               }
                                break;
 
                        case HDF5:
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
index 718584acfe..7991c5701f 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
@@ -25,7 +25,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysds.conf.CompilerConfig.ConfigType;
 import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -54,8 +56,18 @@ public class WriterBinaryBlock extends MatrixWriter {
                if(HDFSTool.USE_BINARYBLOCK_SERIALIZATION)
                        HDFSTool.addBinaryBlockSerializationFramework(job);
 
-               if(src instanceof CompressedMatrixBlock)
-                       src = CompressedMatrixBlock.getUncompressed(src, 
"Decompressing for binary write");
+               if(src instanceof CompressedMatrixBlock) {
+                       
if(ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS)){
+                               LOG.debug("Multi threaded decompression");
+                               // parallel
+                               src = 
CompressedMatrixBlock.getUncompressed(src, "binary write",
+                                       
OptimizerUtils.getParallelBinaryWriteParallelism());
+                       }
+                       else {
+                               LOG.warn("Single threaded decompression");
+                               src = 
CompressedMatrixBlock.getUncompressed(src, "binary write");
+                       }
+               }
 
                // core write sequential/parallel
                if(diag)
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index 4562a2dc4e..cd6a484326 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -512,6 +512,7 @@ public class UtilFunctions {
                        case INT64:   return (Long)in;
                        case INT32:   return (Integer)in;
                        case BOOLEAN: return ((Boolean)in) ? 1 : 0;
+                       case CHARACTER: return (int)((Character)in);
                        case STRING:
                                try {
                                        return !((String) in).isEmpty() ? 
Double.parseDouble((String) in) : 0;
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodings.java
 
b/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodings.java
new file mode 100644
index 0000000000..5a3f66ca37
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodings.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.combine;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.estim.encoding.DenseEncoding;
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.junit.Test;
+
+public class CombineEncodings {
+
+       public static final Log LOG = 
LogFactory.getLog(CombineEncodings.class.getName());
+
+       @Test
+       public void combineCustom() {
+               IEncode ae = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
+               IEncode be = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
+               Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
+               IEncode ce = cec.getLeft();
+               Map<Integer, Integer> cem = cec.getRight();
+               assertTrue(cem.size() == 10);
+               assertTrue(cem.size() == ce.getUnique());
+               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(10, 
new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10))));
+
+       }
+
+       @Test
+       public void combineCustom2() {
+               IEncode ae = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 8}, 10));
+               IEncode be = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
+               Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
+               IEncode ce = cec.getLeft();
+               Map<Integer, Integer> cem = cec.getRight();
+               assertTrue(cem.size() == 10);
+               assertTrue(cem.size() == ce.getUnique());
+               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(10, 
new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10))));
+
+       }
+
+       @Test
+       public void combineCustom3() {
+               IEncode ae = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 8}, 10));
+               IEncode be = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 9}, 10));
+               Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
+               IEncode ce = cec.getLeft();
+               Map<Integer, Integer> cem = cec.getRight();
+               assertTrue(cem.size() == 9);
+               assertTrue(cem.size() == ce.getUnique());
+               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(10, 
new int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 8}, 9))));
+
+       }
+
+       @Test
+       public void combineCustom4() {
+               IEncode ae = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 10));
+               IEncode be = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 10));
+               Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
+               IEncode ce = cec.getLeft();
+               Map<Integer, Integer> cem = cec.getRight();
+               assertTrue(cem.size() == 8);
+               assertTrue(cem.size() == ce.getUnique());
+               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(10, 
new int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 8))));
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodingsUnique.java
 
b/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodingsUnique.java
new file mode 100644
index 0000000000..bff3ba1709
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodingsUnique.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.combine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
+import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory;
+import org.apache.sysds.runtime.compress.estim.encoding.DenseEncoding;
+import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory;
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.apache.sysds.runtime.compress.estim.encoding.SparseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class CombineEncodingsUnique {
+       public static final Log LOG = 
LogFactory.getLog(CombineEncodingsUnique.class.getName());
+       private final IEncode ae;
+       private final IEncode be;
+
+       private enum MapVar {
+               V1, V2;
+       }
+
+       @Parameters
+       public static Collection<Object[]> data() {
+               List<Object[]> tests = new ArrayList<>();
+               try {
+                       int[] unique = new int[] {1, 3, 6};
+                       int[] seeds = new int[] {1, 3214, 2, 13};
+                       int[] sizes = new int[] {10, 12, 32, 56};
+
+                       for(int u : unique) {
+                               for(int s : sizes) {
+                                       for(int se : seeds) {
+                                               tests.add(new Object[] 
{genDense(u, s, se, MapVar.V1), genDense(u, s, se + 1, MapVar.V2)});
+                                               final int maxRows = 5 * s;
+                                               SparseEncoding sp = 
genSparse(u, s, 5, maxRows, se, MapVar.V2);
+                                               SparseEncoding sp2 = 
genSparse(u, s, 5, maxRows, se + 1, MapVar.V2);
+                                               DenseEncoding de = genDense(u, 
sp.getNumRows(), se + 2, MapVar.V2);
+
+                                               tests.add(new Object[] {de, 
sp});
+                                               tests.add(new Object[] {sp, 
sp2});
+                                               tests.add(new Object[] {sp2, 
sp});
+
+                                       }
+                               }
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail("failed constructing tests");
+               }
+
+               return tests;
+       }
+
+       public CombineEncodingsUnique(IEncode a, IEncode b) {
+               this.ae = a;
+               this.be = b;
+       }
+
+       @Test
+       public void combineUnique() {
+               try {
+
+                       Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
+                       IEncode ce = cec.getLeft();
+                       Map<Integer, Integer> cem = cec.getRight();
+                       // LOG.error(ae + "\n" + be + "\n" + ce + "\n" + cem);
+                       assertEquals(cem.size(), ce.getUnique());
+                       // check all unique values are contained.
+                       checkContainsAllUnique(ce);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       LOG.error("Failed to combine " + ae + " " + be);
+                       fail(e.getMessage());
+               }
+       }
+
+       private void checkContainsAllUnique(IEncode ce) {
+               if(ce instanceof DenseEncoding) {
+                       DenseEncoding ced = (DenseEncoding) ce;
+                       AMapToData m = ced.getMap();
+                       Set<Integer> s = new HashSet<>();
+                       for(int i = 0; i < m.size(); i++) {
+                               s.add(m.getIndex(i));
+                       }
+
+                       assertEquals(m.getUnique(), s.size());
+               }
+               else {
+                       throw new NotImplementedException();
+               }
+       }
+
+       private static DenseEncoding genDense(int unique, int size, int seed, 
MapVar v) {
+               return new DenseEncoding(genMap(unique, size, seed, v));
+       }
+
+       // private static SparseEncoding genSparse(int unique, int size, int 
delta, int seed, MapVar v) {
+       // AOffset of = genOffset(size, delta, seed);
+       // AMapToData map = genMap(unique, size, seed + 1, v);
+       // return EncodingFactory.createSparse(map, of, of.getOffsetToLast() + 
10);
+       // }
+
+       private static SparseEncoding genSparse(int unique, int size, int 
delta, int nRows, int seed, MapVar v) {
+               AOffset of = genOffset(size, delta, nRows, seed);
+               AMapToData map = genMap(unique, size, seed + 1, v);
+               return EncodingFactory.createSparse(map, of, nRows);
+       }
+
+       private static AMapToData genMap(int unique, int size, int seed, MapVar 
v) {
+               switch(v) {
+                       case V1:
+                               return genMapV1(unique, size, seed);
+                       case V2:
+                       default:
+                               return genMapV2(unique, size, seed);
+
+               }
+       }
+
+       private static AMapToData genMapV1(int unique, int size, int seed) {
+               AMapToData m = MapToFactory.create(size, unique);
+               for(int i = 0; i < unique; i++) {
+                       m.set(i, i);
+               }
+               Random r = new Random(seed);
+               for(int i = unique; i < size; i++) {
+                       m.set(i, r.nextInt(unique));
+               }
+               return m;
+       }
+
+       private static AMapToData genMapV2(int unique, int size, int seed) {
+               AMapToData m = MapToFactory.create(size, unique);
+               Random r = new Random(seed);
+               for(int i = 0; i < size - unique; i++) {
+                       m.set(i, r.nextInt(unique));
+               }
+               for(int i = 0; i < unique; i++) {
+                       m.set(i + size - unique, i);
+               }
+               return m;
+       }
+
+       private static AOffset genOffset(int size, int delta, int max, int 
seed) {
+               int[] offsets = new int[size];
+               Random r = new Random(seed);
+               int off = offsets[0] = r.nextInt(delta);
+               for(int i = 1; i < size; i++) {
+                       off = offsets[i] = off + 1 + r.nextInt(delta);
+               }
+               return OffsetFactory.createOffset(offsets);
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/dictionary/CombineTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/dictionary/CombineTest.java
index 5e3286e73c..090741de67 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/dictionary/CombineTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/dictionary/CombineTest.java
@@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +35,7 @@ import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.colgroup.AColGroupCompressed;
 import org.apache.sysds.runtime.compress.colgroup.ADictBasedColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ASDC;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
@@ -480,7 +484,6 @@ public class CombineTest {
                }
        }
 
-
        @Test
        public void combineDictionariesSparse2() {
                try {
@@ -502,4 +505,228 @@ public class CombineTest {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void combineMockingEmpty() {
+               ADictionary ad = Dictionary.create(new double[] {1, 2, 3, 4});
+               double[] ade = new double[] {0};
+               AColGroupCompressed a = mockSDC(ad, ade);
+               AColGroupCompressed b = mockSDC(ad, ade);
+
+               Map<Integer, Integer> m = new HashMap<>();
+               ADictionary red = DictionaryFactory.combineDictionaries(a, b, 
m);
+
+               assertEquals(red.getNumberOfValues(2), 0);
+       }
+
+       @Test
+       public void combineMockingDefault() {
+               ADictionary ad = Dictionary.create(new double[] {1, 2, 3, 4});
+               double[] ade = new double[] {0};
+               AColGroupCompressed a = mockSDC(ad, ade);
+               AColGroupCompressed b = mockSDC(ad, ade);
+
+               Map<Integer, Integer> m = new HashMap<>();
+               m.put(0, 0);
+               ADictionary red = DictionaryFactory.combineDictionaries(a, b, 
m);
+
+               assertEquals(red.getNumberOfValues(2), 1);
+               assertEquals(red, Dictionary.createNoCheck(new double[] {0, 
0}));
+       }
+
+       @Test
+       public void combineMockingFirstValue() {
+               ADictionary ad = Dictionary.create(new double[] {1, 2, 3, 4});
+               double[] ade = new double[] {0};
+               AColGroupCompressed a = mockSDC(ad, ade);
+               AColGroupCompressed b = mockSDC(ad, ade);
+
+               Map<Integer, Integer> m = new HashMap<>();
+               m.put(1, 0);
+               ADictionary red = DictionaryFactory.combineDictionaries(a, b, 
m);
+
+               assertEquals(red.getNumberOfValues(2), 1);
+               assertEquals(red, Dictionary.create(new double[] {1, 0}));
+       }
+
+       @Test
+       public void combineMockingFirstAndDefault() {
+               ADictionary ad = Dictionary.create(new double[] {1, 2, 3, 4});
+               double[] ade = new double[] {0};
+               AColGroupCompressed a = mockSDC(ad, ade);
+               AColGroupCompressed b = mockSDC(ad, ade);
+
+               Map<Integer, Integer> m = new HashMap<>();
+               m.put(1, 0);
+               m.put(0, 1);
+               ADictionary red = DictionaryFactory.combineDictionaries(a, b, 
m);
+
+               assertEquals(red.getNumberOfValues(2), 2);
+               assertEquals(red, Dictionary.create(new double[] {1, 0, 0, 0}));
+       }
+
+       @Test
+       public void combineMockingMixed() {
+               ADictionary ad = Dictionary.create(new double[] {1, 2, 3, 4});
+               double[] ade = new double[] {0};
+               AColGroupCompressed a = mockSDC(ad, ade);
+               AColGroupCompressed b = mockSDC(ad, ade);
+
+               Map<Integer, Integer> m = new HashMap<>();
+               m.put(1, 0);
+               m.put(0, 1);
+               m.put(5, 2);
+               ADictionary red = DictionaryFactory.combineDictionaries(a, b, 
m);
+
+               assertEquals(red.getNumberOfValues(2), 3);
+               assertEquals(Dictionary.create(new double[] {1, 0, 0, 0, 0, 
1}), red);
+       }
+
+       @Test
+       public void combineMockingMixed2() {
+               ADictionary ad = Dictionary.create(new double[] {1, 2, 3, 4});
+               double[] ade = new double[] {0};
+               AColGroupCompressed a = mockSDC(ad, ade);
+               AColGroupCompressed b = mockSDC(ad, ade);
+
+               Map<Integer, Integer> m = new HashMap<>();
+               m.put(1, 0);
+               m.put(0, 1);
+               m.put(10, 2);
+               ADictionary red = DictionaryFactory.combineDictionaries(a, b, 
m);
+
+               assertEquals(red.getNumberOfValues(2), 3);
+               assertEquals(Dictionary.create(new double[] {1, 0, 0, 0, 0, 
2}), red);
+       }
+
+       @Test
+       public void combineMockingSparseDenseEmpty() {
+               try {
+
+                       ADictionary ad = Dictionary.create(new double[] {1, 2, 
3, 4});
+                       double[] ade = new double[] {0};
+                       AColGroupCompressed a = mockDDC(ad, 1);
+                       AColGroupCompressed b = mockSDC(ad, ade);
+
+                       Map<Integer, Integer> m = new HashMap<>();
+                       ADictionary red = 
DictionaryFactory.combineDictionaries(a, b, m);
+
+                       assertEquals(0, red.getNumberOfValues(2));
+                       assertEquals(Dictionary.createNoCheck(new double[] {}), 
red);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void combineMockingSparseDenseOne() {
+               try {
+
+                       ADictionary ad = Dictionary.create(new double[] {1, 2, 
3, 4});
+                       double[] ade = new double[] {0};
+                       AColGroupCompressed a = mockDDC(ad, 1);
+                       AColGroupCompressed b = mockSDC(ad, ade);
+
+                       Map<Integer, Integer> m = new HashMap<>();
+                       m.put(0, 0);
+                       ADictionary red = 
DictionaryFactory.combineDictionaries(a, b, m);
+                       assertEquals(1, red.getNumberOfValues(2));
+                       assertEquals(Dictionary.createNoCheck(new double[] {1, 
0}), red);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void combineMockingSparseDenseMixed1() {
+               try {
+
+                       ADictionary ad = Dictionary.create(new double[] {1, 2, 
3, 4});
+                       double[] ade = new double[] {0};
+                       AColGroupCompressed a = mockDDC(ad, 1);
+                       AColGroupCompressed b = mockSDC(ad, ade);
+
+                       Map<Integer, Integer> m = new HashMap<>();
+                       m.put(0, 1);
+                       m.put(1, 0);
+                       ADictionary red = 
DictionaryFactory.combineDictionaries(a, b, m);
+
+                       assertEquals(2, red.getNumberOfValues(2));
+                       assertEquals(Dictionary.createNoCheck(new double[] {2, 
0, 1, 0}), red);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void combineMockingSparseDenseMixed2() {
+               try {
+
+                       ADictionary ad = Dictionary.create(new double[] {1, 2, 
3, 4});
+                       double[] ade = new double[] {0};
+                       AColGroupCompressed a = mockDDC(ad, 1);
+                       AColGroupCompressed b = mockSDC(ad, ade);
+
+                       Map<Integer, Integer> m = new HashMap<>();
+                       m.put(0, 1);
+                       m.put(1, 0);
+                       m.put(4, 2);
+                       ADictionary red = 
DictionaryFactory.combineDictionaries(a, b, m);
+
+                       assertEquals(3, red.getNumberOfValues(2));
+                       assertEquals(Dictionary.createNoCheck(new double[] {2, 
0, 1, 0, 1, 1}), red);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void combineMockingSparseDenseMixed3() {
+               try {
+
+                       ADictionary ad = Dictionary.create(new double[] {1, 2, 
3, 4});
+                       double[] ade = new double[] {0};
+                       AColGroupCompressed a = mockDDC(ad, 1);
+                       AColGroupCompressed b = mockSDC(ad, ade);
+
+                       Map<Integer, Integer> m = new HashMap<>();
+                       m.put(0, 1);
+                       m.put(1, 0);
+                       m.put(5, 2);
+                       m.put(4, 3);
+                       ADictionary red = 
DictionaryFactory.combineDictionaries(a, b, m);
+
+                       assertEquals(4, red.getNumberOfValues(2));
+                       assertEquals(Dictionary.createNoCheck(new double[] {2, 
0, 1, 0, 2, 1, 1, 1}), red);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       private ASDC mockSDC(ADictionary ad, double[] def) {
+               ASDC a = mock(ASDC.class);
+               when(a.getCompType()).thenReturn(CompressionType.SDC);
+               when(a.getDictionary()).thenReturn(ad);
+               when(a.getDefaultTuple()).thenReturn(def);
+               when(a.getNumCols()).thenReturn(def.length);
+               return a;
+       }
+
+       private ColGroupDDC mockDDC(ADictionary ad, int nCol) {
+               ColGroupDDC a = mock(ColGroupDDC.class);
+               when(a.getCompType()).thenReturn(CompressionType.DDC);
+               when(a.getDictionary()).thenReturn(ad);
+               when(a.getNumCols()).thenReturn(nCol);
+               return a;
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTests.java
 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTests.java
index 0cd6a68d42..6227368c10 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTests.java
@@ -96,7 +96,7 @@ public class MappingTests {
                m = genMap(MapToFactory.create(size, max), expected, max, fill, 
seed);
        }
 
-       protected static AMapToData genMap(AMapToData m, int[] expected, int 
max, boolean fill, int seed) {
+       public static AMapToData genMap(AMapToData m, int[] expected, int max, 
boolean fill, int seed) {
                if(max <= 1)
                        return m;
                Random vals = new Random(seed);
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/FrameSerializationTest.java
 
b/src/test/java/org/apache/sysds/test/component/frame/FrameSerializationTest.java
index acb6b8cf3e..249680080a 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/FrameSerializationTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/FrameSerializationTest.java
@@ -36,6 +36,7 @@ import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.test.TestUtils;
 import org.apache.sysds.test.component.frame.array.FrameArrayTests;
+import org.apache.sysds.test.component.frame.compress.FrameCompressTestUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -67,6 +68,11 @@ public class FrameSerializationTest {
                                        tests.add(new Object[] {new 
FrameBlock(sch, FrameArrayTests.generateRandomString(sch.length, 32)), t});
                                }
                        }
+                       for(SerType t : SerType.values()) {
+                               tests.add(new Object[] 
{FrameCompressTestUtils.generateCompressableBlockRandomTypes(200, 4, 31), t});
+                               tests.add(new Object[] 
{FrameCompressTestUtils.generateCompressableBlockRandomTypes(102, 4, 2), t});
+                               tests.add(new Object[] 
{FrameCompressTestUtils.generateCompressableBlockRandomTypes(524, 4, 13), t});
+                       }
                }
                catch(Exception e) {
                        e.printStackTrace();
@@ -125,7 +131,7 @@ public class FrameSerializationTest {
                                return; // not valid test
                        FrameBlock back = new FrameBlock();
                        back = writableSerialize(frame, back);
-                       back = back.slice(0, frame.getNumRows()-1 , 0, 0);
+                       back = back.slice(0, frame.getNumRows() - 1, 0, 0);
                        ValueType[] v1 = back.getSchema();
                        back = writableSerialize(frame, back);
                        ValueType[] v2 = back.getSchema();
@@ -141,7 +147,9 @@ public class FrameSerializationTest {
        @Test
        public void estimateMemory() {
                // should always be true that in memory size is bigger than 
serialized size.
-               assertTrue(frame.getInMemorySize() > 
frame.getExactSerializedSize());
+
+               assertTrue(String.format(" %5d vs %5d :\n\n %s", 
frame.getInMemorySize(), frame.getExactSerializedSize(), frame),
+                       frame.getInMemorySize() > 
frame.getExactSerializedSize());
        }
 
        private static FrameBlock writableSerialize(FrameBlock in) throws 
Exception {
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
index 67c7505567..6419b85ba3 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
@@ -40,10 +40,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.frame.data.columns.Array;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
-import org.apache.sysds.runtime.frame.data.columns.BitSetArray;
 import org.apache.sysds.runtime.frame.data.columns.BooleanArray;
 import org.apache.sysds.runtime.frame.data.columns.CharArray;
 import org.apache.sysds.runtime.frame.data.columns.DDCArray;
@@ -128,7 +128,6 @@ public class FrameArrayTests {
                        tests.add(new Object[] 
{ArrayFactory.create(generateRandomNullZeroString(67, 21)), 
FrameArrayType.STRING});
                        tests.add(new Object[] 
{ArrayFactory.create(generateRandomNullFloatString(67, 21)), 
FrameArrayType.STRING});
                        tests.add(new Object[] {ArrayFactory.create(new 
String[30]), FrameArrayType.STRING}); // all null
-
                        tests.add(new Object[] {ArrayFactory.create(new char[] 
{0, 0, 0, 0, 1, 1, 1}), FrameArrayType.CHARACTER});
                        tests.add(new Object[] {ArrayFactory.create(new char[] 
{'t', 't', 'f', 'f', 'T'}), FrameArrayType.CHARACTER});
                        tests.add(new Object[] {ArrayFactory.create(new char[] 
{'0', '2', '3', '4', '9'}), FrameArrayType.CHARACTER});
@@ -222,12 +221,12 @@ public class FrameArrayTests {
        @Test
        public void getSizeEstimateVsReal() {
                long memSize = a.getInMemorySize();
-               long estSize = ArrayFactory.getInMemorySize(a.getValueType(), 
a.size());
+               long estSize = ArrayFactory.getInMemorySize(a.getValueType(), 
a.size(),
+                       a.containsNull() || a instanceof OptionalArray);
+
                switch(a.getValueType()) {
                        case BOOLEAN:
-                               if(a instanceof BitSetArray)
-                                       estSize = 
BitSetArray.estimateInMemorySize(a.size());
-                               else
+                               if(a instanceof BooleanArray) // just in case 
we overwrite the BitSet to boolean Array type.
                                        estSize = 
BooleanArray.estimateInMemorySize(a.size());
                        default: // nothing
                }
@@ -458,6 +457,7 @@ public class FrameArrayTests {
 
        @SuppressWarnings("unchecked")
        public void testSetRange(int start, int end, int otherSize, int seed) {
+               FrameBlock.debug = true;
                try {
                        Array<?> other = create(a.getFrameArrayType(), 
otherSize, seed);
                        try {
@@ -1761,7 +1761,8 @@ public class FrameArrayTests {
                int nUnique = Math.max(size / 100, 2);
                switch(t) {
                        case STRING:
-                               return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomStringOpt(size, 
seed)));
+                               return DDCArray
+                                       
.compressToDDC(ArrayFactory.create(generateRandomStringNUniqueLengthOpt(size, 
seed, nUnique, 132)));
                        case BITSET:// not a thing
                        case BOOLEAN:
                                return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomBooleanOpt(size, 
seed)));
@@ -2130,7 +2131,7 @@ public class FrameArrayTests {
                return ret;
        }
 
-       protected static Boolean[] generateRandomBooleanOpt(int size, int seed) 
{
+       public static Boolean[] generateRandomBooleanOpt(int size, int seed) {
                Random r = new Random(seed);
                Boolean[] ret = new Boolean[size];
                for(int i = 0; i < size; i++) {
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/NegativeArrayTests.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/NegativeArrayTests.java
index ab98354b3f..68900fe5c9 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/NegativeArrayTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/NegativeArrayTests.java
@@ -54,8 +54,13 @@ public class NegativeArrayTests {
        }
 
        @Test(expected = DMLRuntimeException.class)
-       public void testEstimateMemorySizeInvalid() {
-               ArrayFactory.getInMemorySize(ValueType.UNKNOWN, 0);
+       public void testEstimateMemorySizeInvalid_1() {
+               ArrayFactory.getInMemorySize(ValueType.UNKNOWN, 0, false);
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void testEstimateMemorySizeInvalid_2() {
+               ArrayFactory.getInMemorySize(ValueType.UNKNOWN, 0, true);
        }
 
        @Test
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTest.java
 
b/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTest.java
index fc4e69d752..9ed4fa6747 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTest.java
@@ -23,14 +23,12 @@ import static org.junit.Assert.fail;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
-import org.apache.sysds.runtime.frame.data.columns.Array;
-import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
 import 
org.apache.sysds.runtime.frame.data.compress.CompressedFrameBlockFactory;
 import org.apache.sysds.runtime.frame.data.compress.FrameCompressionSettings;
 import org.apache.sysds.runtime.frame.data.lib.FrameLibCompress;
 import org.apache.sysds.test.TestUtils;
-import org.apache.sysds.test.component.frame.array.FrameArrayTests;
 import org.junit.Test;
 
 public class FrameCompressTest {
@@ -38,13 +36,13 @@ public class FrameCompressTest {
 
        @Test
        public void testSingleThread() {
-               FrameBlock a = generateCompressableBlock(200, 5, 1232);
+               FrameBlock a = 
FrameCompressTestUtils.generateCompressableBlock(200, 5, 1232, 
ValueType.STRING);
                runTest(a, 1);
        }
 
        @Test
        public void testParallel() {
-               FrameBlock a = generateCompressableBlock(200, 5, 1232);
+               FrameBlock a = 
FrameCompressTestUtils.generateCompressableBlock(200, 5, 1232, 
ValueType.STRING);
                runTest(a, 4);
        }
 
@@ -70,12 +68,4 @@ public class FrameCompressTest {
                }
        }
 
-       private FrameBlock generateCompressableBlock(int rows, int cols, int 
seed) {
-               Array<?>[] data = new Array<?>[cols];
-               for(int i = 0; i < cols; i++) {
-                       data[i] = ArrayFactory.create(//
-                               
FrameArrayTests.generateRandomStringNUniqueLengthOpt(rows, seed + i, i + 1, 55 
+ i));
-               }
-               return new FrameBlock(data);
-       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTestUtils.java
 
b/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTestUtils.java
new file mode 100644
index 0000000000..bc512c5567
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTestUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.frame.compress;
+
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
+import org.apache.sysds.test.component.frame.array.FrameArrayTests;
+
+public class FrameCompressTestUtils {
+       protected static final Log LOG = 
LogFactory.getLog(FrameCompressTest.class.getName());
+
+       public static FrameBlock generateCompressableBlock(int rows, int cols, 
int seed, ValueType vt) {
+               Array<?>[] data = new Array<?>[cols];
+               for(int i = 0; i < cols; i++)
+                       data[i] = generateArray(rows, seed + i, i + 1, vt);
+
+               return new FrameBlock(data);
+       }
+
+       public static FrameBlock generateCompressableBlockRandomTypes(int rows, 
int cols, int seed) {
+               Array<?>[] data = new Array<?>[cols];
+               Random r = new Random(seed + 13);
+               for(int i = 0; i < cols; i++) {
+                       ValueType vt = 
ValueType.values()[r.nextInt(ValueType.values().length)];
+                       data[i] = generateArray(rows, seed + i, i + 1, vt);
+               }
+
+               return new FrameBlock(data);
+       }
+
+       public static Array<?> generateArray(int size, int seed, int nUnique, 
ValueType vt) {
+               switch(vt) {
+                       case BOOLEAN:
+                               return 
ArrayFactory.create(FrameArrayTests.generateRandomBooleanOpt(size, seed));
+                       case INT32:
+                               return 
ArrayFactory.create(FrameArrayTests.generateRandomIntegerNUniqueLengthOpt(size, 
seed, nUnique));
+                       case INT64:
+                               return 
ArrayFactory.create(FrameArrayTests.generateRandomLongNUniqueLengthOpt(size, 
seed, nUnique));
+                       case FP32:
+                               return 
ArrayFactory.create(FrameArrayTests.generateRandomFloatNUniqueLengthOpt(size, 
seed, nUnique));
+                       case FP64:
+                               return 
ArrayFactory.create(FrameArrayTests.generateRandomDoubleNUniqueLengthOpt(size, 
seed, nUnique));
+                       case CHARACTER:
+                               return 
ArrayFactory.create(FrameArrayTests.generateRandomCharacterNUniqueLengthOpt(size,
 seed, nUnique));
+                       case STRING:
+                       default:
+                               return 
ArrayFactory.create(FrameArrayTests.generateRandomStringNUniqueLengthOpt(size, 
seed, nUnique, 132));
+               }
+       }
+}

Reply via email to