This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new d398c50227 [SYSTEMDS-3532] Parallel writer compressed
d398c50227 is described below

commit d398c50227fd6d75cc65d8c3fd9e796d3b86aa7b
Author: baunsgaard <[email protected]>
AuthorDate: Tue Apr 25 15:16:43 2023 +0200

    [SYSTEMDS-3532] Parallel writer compressed
    
    Parallel writer as the commit says of the compressed matrices to disk.
    Also updated and fixed bugs in the compressed scheme apply and change
    the API for compression scheme apply.
    
    Closes #1782
---
 .../sysds/runtime/compress/colgroup/AColGroup.java |   2 +-
 .../runtime/compress/colgroup/ColGroupDDC.java     |   2 +-
 .../runtime/compress/colgroup/ColGroupFactory.java |   2 +-
 .../colgroup/dictionary/DictionaryFactory.java     |   2 +-
 .../compress/colgroup/indexes/RangeIndex.java      |   2 +-
 .../compress/colgroup/indexes/SingleIndex.java     |   2 +-
 .../compress/colgroup/indexes/TwoIndex.java        |   2 +-
 .../compress/colgroup/mapping/AMapToData.java      |   2 +-
 .../compress/colgroup/scheme/ACLAScheme.java       |  57 +++++++
 .../compress/colgroup/scheme/ConstScheme.java      | 108 +++----------
 .../compress/colgroup/scheme/DDCScheme.java        |  80 ++++-----
 .../scheme/{DDCScheme.java => DDCSchemeMC.java}    | 107 ++++++++----
 .../compress/colgroup/scheme/DDCSchemeSC.java      | 154 ++++++++++++++++++
 .../compress/colgroup/scheme/EmptyScheme.java      |  83 +---------
 .../compress/colgroup/scheme/ICLAScheme.java       |  39 ++++-
 .../compress/colgroup/scheme/SchemeFactory.java    |  56 +++++++
 .../runtime/compress/io/ReaderCompressed.java      |   7 +-
 .../runtime/compress/io/WriterCompressed.java      | 159 +++++++++++++-----
 .../sysds/runtime/compress/lib/CLALibCombine.java  |  50 ++++--
 .../compress/plan/CompressionPlanFactory.java      |  67 ++++++++
 .../{utils/DCounts.java => plan/IPlanEncode.java}  |  40 +++--
 .../runtime/compress/plan/NaivePlanEncode.java     | 180 +++++++++++++++++++++
 .../compress/readers/ReaderColumnSelection.java    |  12 +-
 .../utils/{DArrCounts.java => ACount.java}         |  51 ++++--
 .../compress/utils/DblArrayCountHashMap.java       |  33 +++-
 .../runtime/compress/utils/DoubleCountHashMap.java |  22 +++
 .../runtime/io/WriterBinaryBlockParallel.java      |   1 -
 .../runtime/transform/encode/CompressedEncode.java |   6 -
 .../colgroup/scheme/CLAConstSchemeTest.java        | 110 +------------
 .../compress/colgroup/scheme/CLADDCSchemeTest.java |  44 +++--
 .../colgroup/scheme/CLAEmptySchemeTest.java        | 118 ++------------
 .../sysds/test/component/compress/io/IOTest.java   |   4 +-
 .../compress/plan/CustomEncodePlanTest.java        | 143 ++++++++++++++++
 .../compress/plan/EncodePerformanceTest.java       |  86 ++++++++++
 34 files changed, 1240 insertions(+), 593 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
index eb3cd40683..e6cd408309 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
@@ -619,7 +619,7 @@ public abstract class AColGroup implements Serializable {
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
-               sb.append(String.format("%15s", "ColGroupType: "));
+               sb.append(String.format("%15s", "Type: "));
                sb.append(this.getClass().getSimpleName());
                sb.append(String.format("\n%15s", "Columns: "));
                sb.append(_colIndexes);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index 71011c4d42..2d03f406db 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -550,7 +550,7 @@ public class ColGroupDDC extends APreAgg implements 
AMapToDataGroup {
        public String toString() {
                StringBuilder sb = new StringBuilder();
                sb.append(super.toString());
-               sb.append(String.format("\n%15s ", "Data: "));
+               sb.append(String.format("\n%15s", "Data: "));
                sb.append(_data);
                return sb.toString();
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index dca37792fd..bf4900e1fb 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -54,7 +54,7 @@ import org.apache.sysds.runtime.compress.cost.ACostEstimate;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
 import org.apache.sysds.runtime.compress.readers.ReaderColumnSelection;
-import org.apache.sysds.runtime.compress.utils.DCounts;
+import org.apache.sysds.runtime.compress.utils.ACount.DCounts;
 import org.apache.sysds.runtime.compress.utils.DblArray;
 import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap;
 import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
index c1b399f7df..bb42122f5e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
@@ -30,7 +30,7 @@ import 
org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.bitmap.ABitmap;
 import org.apache.sysds.runtime.compress.bitmap.Bitmap;
 import org.apache.sysds.runtime.compress.bitmap.MultiColBitmap;
-import org.apache.sysds.runtime.compress.utils.DArrCounts;
+import org.apache.sysds.runtime.compress.utils.ACount.DArrCounts;
 import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap;
 import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
 import org.apache.sysds.runtime.data.SparseBlock;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/RangeIndex.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/RangeIndex.java
index be5a1b7f90..e6cfef1485 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/RangeIndex.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/RangeIndex.java
@@ -167,7 +167,7 @@ public class RangeIndex extends AColIndex {
        public String toString() {
                StringBuilder sb = new StringBuilder();
                sb.append(this.getClass().getSimpleName());
-               sb.append(" [");
+               sb.append("[");
                sb.append(l);
                sb.append(" -> ");
                sb.append(u);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/SingleIndex.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/SingleIndex.java
index aada2ff31d..224cec1180 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/SingleIndex.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/SingleIndex.java
@@ -112,7 +112,7 @@ public class SingleIndex extends AColIndex {
        public String toString() {
                StringBuilder sb = new StringBuilder();
                sb.append(this.getClass().getSimpleName());
-               sb.append(" [");
+               sb.append("[");
                sb.append(idx);
                sb.append("]");
                return sb.toString();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/TwoIndex.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/TwoIndex.java
index 4faa9b05e2..8cbdd63883 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/TwoIndex.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/indexes/TwoIndex.java
@@ -137,7 +137,7 @@ public class TwoIndex extends AColIndex {
        public String toString() {
                StringBuilder sb = new StringBuilder();
                sb.append(this.getClass().getSimpleName());
-               sb.append(" [");
+               sb.append("[");
                sb.append(id1);
                sb.append(", ");
                sb.append(id2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
index a506734a9c..7076f971de 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
@@ -825,7 +825,7 @@ public abstract class AMapToData implements Serializable {
                final int sz = size();
                StringBuilder sb = new StringBuilder();
                sb.append(this.getClass().getSimpleName());
-               sb.append(" [");
+               sb.append("[");
                for(int i = 0; i < sz - 1; i++)
                        sb.append(getIndex(i) + ", ");
                sb.append(getIndex(sz - 1));
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ACLAScheme.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ACLAScheme.java
new file mode 100644
index 0000000000..52510d34fc
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ACLAScheme.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.colgroup.scheme;
+
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+public abstract class ACLAScheme implements ICLAScheme {
+       protected final IColIndex cols;
+
+       protected ACLAScheme(IColIndex cols) {
+               this.cols = cols;
+       }
+
+       protected IColIndex getColIndices() {
+               return cols;
+       }
+
+       @Override
+       public AColGroup encode(MatrixBlock data) {
+               return encode(data, getColIndices());
+       }
+
+       @Override
+       public ICLAScheme update(MatrixBlock data) {
+               return update(data, getColIndices());
+       }
+
+       protected final void validate(MatrixBlock data, IColIndex columns) 
throws IllegalArgumentException {
+               if(columns.size() != cols.size())
+                       throw new IllegalArgumentException(
+                               "Invalid number of columns to encode expected: 
" + cols.size() + " but got: " + columns.size());
+
+               final int nCol = data.getNumColumns();
+               if(nCol < cols.get(cols.size() - 1))
+                       throw new IllegalArgumentException("Invalid columns to 
encode with max col:" + nCol+ " list of columns: "+ columns);
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ConstScheme.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ConstScheme.java
index 13169f4b3f..b2ec96296d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ConstScheme.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ConstScheme.java
@@ -19,113 +19,45 @@
 
 package org.apache.sysds.runtime.compress.colgroup.scheme;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
 import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
-import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-public class ConstScheme implements ICLAScheme {
+public class ConstScheme extends ACLAScheme {
 
-       /** The instance of a constant column group that in all cases here 
would be returned to be the same */
-       final ColGroupConst g;
+       final double[] vals;
+       boolean initialized = false;
 
-       protected ConstScheme(ColGroupConst g) {
-               this.g = g;
+       private ConstScheme(IColIndex cols, double[] vals, boolean initialized) 
{
+               super(cols);
+               this.vals = vals;
+               this.initialized = initialized;
        }
 
        public static ICLAScheme create(ColGroupConst g) {
-               return new ConstScheme(g);
+               return new ConstScheme(g.getColIndices(), g.getValues(), true);
        }
 
-       @Override
-       public AColGroup encode(MatrixBlock data) {
-               return encode(data, g.getColIndices(), g.getValues());
+       public static ICLAScheme create(IColIndex cols) {
+               return new ConstScheme(cols, new double[cols.size()], false);
        }
 
        @Override
-       public AColGroup encode(MatrixBlock data, IColIndex columns) {
-               if(columns.size() != g.getColIndices().size())
-                       throw new IllegalArgumentException("Invalid columns to 
encode");
-               return encode(data, columns, g.getValues());
-       }
-
-       private AColGroup encode(final MatrixBlock data, final IColIndex cols, 
final double[] values) {
-               final int nCol = data.getNumColumns();
-               final int nRow = data.getNumRows();
-               if(nCol < cols.get(cols.size() - 1)) {
-                       LOG.warn("Invalid to encode matrix with less columns 
than encode scheme max column");
-                       return null;
-               }
-               else if(data.isEmpty()) {
-                       LOG.warn("Invalid to encode an empty matrix into 
constant column group");
-                       return null; // Invalid to encode this.
-               }
-               else if(data.isInSparseFormat())
-                       return encodeSparse(data, cols, values, nRow, nCol);
-               else if(data.getDenseBlock().isContiguous())
-                       return encodeDense(data, cols, values, nRow, nCol);
-               else
-                       return encodeGeneric(data, cols, values, nRow, nCol);
-       }
-
-       private AColGroup encodeDense(final MatrixBlock data, final IColIndex 
cols, final double[] values, final int nRow,
-               final int nCol) {
-               final double[] dv = data.getDenseBlockValues();
-               for(int r = 0; r < nRow; r++) {
-                       final int off = r * nCol;
-                       for(int ci = 0; ci < cols.size(); ci++)
-                               if(dv[off + cols.get(ci)] != values[ci])
-                                       return null;
-               }
-               return returnG(cols);
+       protected IColIndex getColIndices() {
+               return cols;
        }
 
-       private AColGroup encodeSparse(final MatrixBlock data, final IColIndex 
cols, final double[] values, final int nRow,
-               final int nCol) {
-               SparseBlock sb = data.getSparseBlock();
-               for(int r = 0; r < nRow; r++) {
-                       if(sb.isEmpty(r))
-                               return null;
-
-                       final int apos = sb.pos(r);
-                       final int alen = apos + sb.size(r);
-                       final double[] aval = sb.values(r);
-                       final int[] aix = sb.indexes(r);
-                       int p = 0; // pointer into cols;
-                       while(values[p] == 0.0)
-                               // technically also check for&& p < cols.length
-                               // but this verification is indirectly 
maintained
-                               p++;
-                       for(int j = apos; j < alen && p < cols.size(); j++) {
-                               if(aix[j] == cols.get(p)) {
-                                       if(aval[j] != values[p])
-                                               return null;
-                                       p++;
-                                       while(p < cols.size() && values[p] == 
0.0)
-                                               p++;
-                               }
-                               else if(aix[j] > cols.get(p))
-                                       return null; // not matching
-                       }
-               }
-               return returnG(cols);
-       }
-
-       private AColGroup encodeGeneric(final MatrixBlock data, final IColIndex 
cols, final double[] values, final int nRow,
-               final int nCol) {
-               for(int r = 0; r < nRow; r++)
-                       for(int ci = 0; ci < cols.size(); ci++)
-                               if(data.quickGetValue(r, cols.get(ci)) != 
values[ci])
-                                       return null;
-               return returnG(cols);
+       @Override
+       public ICLAScheme update(MatrixBlock data, IColIndex columns) {
+               throw new NotImplementedException();
        }
 
-       private AColGroup returnG(IColIndex columns) {
-               if(columns == g.getColIndices())
-                       return g;// great!
-               else
-                       return ColGroupConst.create(columns, g.getValues());
+       @Override
+       public AColGroup encode(MatrixBlock data, IColIndex columns) {
+               validate(data, columns);
+               return ColGroupConst.create(columns, vals);
        }
 
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCScheme.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCScheme.java
index f6aacd71ad..75a63ead60 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCScheme.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCScheme.java
@@ -19,41 +19,17 @@
 
 package org.apache.sysds.runtime.compress.colgroup.scheme;
 
-import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
-import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
 import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
-import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
-import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
-import org.apache.sysds.runtime.compress.readers.ReaderColumnSelection;
-import org.apache.sysds.runtime.compress.utils.DblArray;
-import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-public class DDCScheme implements ICLAScheme {
+public abstract class DDCScheme extends ACLAScheme {
 
-       final private IColIndex cols;
-       final private int nUnique;
-       final private DblArrayCountHashMap map;
-       final private ADictionary dict;
-
-       private DDCScheme(ColGroupDDC g) {
-               this.cols = g.getColIndices();
-               this.nUnique = g.getNumValues();
-               this.dict = g.getDictionary();
-               final MatrixBlock mbDict = 
dict.getMBDict(this.cols.size()).getMatrixBlock();
-               final int dictRows = mbDict.getNumRows();
-               final int dictCols = mbDict.getNumColumns();
-
-               // Read the mapping data and materialize map.
-               map = new DblArrayCountHashMap(dictRows, dictCols);
-               final ReaderColumnSelection r = 
ReaderColumnSelection.createReader(mbDict, //
-                       ColIndexFactory.create(dictCols), false, 0, dictRows);
-               DblArray d = null;
-               while((d = r.nextRow()) != null)
-                       map.increment(d);
+       // TODO make it into a soft refrence
+       protected ADictionary lastDict;
 
+       protected DDCScheme(IColIndex cols) {
+               super(cols);
        }
 
        /**
@@ -62,36 +38,36 @@ public class DDCScheme implements ICLAScheme {
         * @param g A DDC Column group
         * @return A DDC Compression scheme
         */
-       public static ICLAScheme create(ColGroupDDC g) {
-               if(g.getColIndices().size() == 1)
-                       return null;
-               return new DDCScheme(g);
+       public static DDCScheme create(ColGroupDDC g) {
+               return g.getNumCols() == 1 ? new DDCSchemeSC(g) : new 
DDCSchemeMC(g);
        }
 
-       @Override
-       public AColGroup encode(MatrixBlock data) {
-               return encode(data, cols);
+       /**
+        * Create a scheme for the DDC compression given a list of columns.
+        * 
+        * @param cols The columns to compress
+        * @return A DDC Compression scheme
+        */
+       public static DDCScheme create(IColIndex cols) {
+               return cols.size() == 1 ? new DDCSchemeSC(cols) : new 
DDCSchemeMC(cols);
        }
 
        @Override
-       public AColGroup encode(MatrixBlock data, IColIndex columns) {
-               if(columns.size() != cols.size())
-                       throw new IllegalArgumentException("Invalid columns to 
encode");
-               final int nRow = data.getNumRows();
-               final ReaderColumnSelection reader = 
ReaderColumnSelection.createReader(//
-                       data, columns, false, 0, nRow);
-               final AMapToData d = MapToFactory.create(nRow,nUnique);
+       protected final IColIndex getColIndices() {
+               return cols;
+       }
 
-               DblArray cellVals;
-               while((cellVals = reader.nextRow()) != null) {
-                       final int row = reader.getCurrentRowIndex();
-                       final int id = map.getId(cellVals);
-                       if(id == -1)
-                               return null;
-                       d.set(row, id);
-               }
+       protected abstract Object getMap();
 
-               return ColGroupDDC.create(columns, dict, d, null);
+       @Override
+       public final String toString() {
+               StringBuilder sb = new StringBuilder();
+               sb.append(this.getClass().getSimpleName());
+               sb.append("\nCols: ");
+               sb.append(cols);
+               sb.append("\nMap:  ");
+               sb.append(getMap());
+               return sb.toString();
        }
 
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCScheme.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCSchemeMC.java
similarity index 50%
copy from 
src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCScheme.java
copy to 
src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCSchemeMC.java
index f6aacd71ad..7917b97e14 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCScheme.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCSchemeMC.java
@@ -21,7 +21,7 @@ package org.apache.sysds.runtime.compress.colgroup.scheme;
 
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
-import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
 import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
@@ -31,67 +31,104 @@ import org.apache.sysds.runtime.compress.utils.DblArray;
 import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-public class DDCScheme implements ICLAScheme {
+public class DDCSchemeMC extends DDCScheme {
 
-       final private IColIndex cols;
-       final private int nUnique;
-       final private DblArrayCountHashMap map;
-       final private ADictionary dict;
+       private final DblArray emptyRow;
 
-       private DDCScheme(ColGroupDDC g) {
-               this.cols = g.getColIndices();
-               this.nUnique = g.getNumValues();
-               this.dict = g.getDictionary();
-               final MatrixBlock mbDict = 
dict.getMBDict(this.cols.size()).getMatrixBlock();
+       private final DblArrayCountHashMap map;
+
+       protected DDCSchemeMC(ColGroupDDC g) {
+               super(g.getColIndices());
+               this.lastDict = g.getDictionary();
+               final MatrixBlock mbDict = 
lastDict.getMBDict(this.cols.size()).getMatrixBlock();
                final int dictRows = mbDict.getNumRows();
                final int dictCols = mbDict.getNumColumns();
 
                // Read the mapping data and materialize map.
-               map = new DblArrayCountHashMap(dictRows, dictCols);
+               map = new DblArrayCountHashMap(dictRows * 2, dictCols);
                final ReaderColumnSelection r = 
ReaderColumnSelection.createReader(mbDict, //
                        ColIndexFactory.create(dictCols), false, 0, dictRows);
                DblArray d = null;
                while((d = r.nextRow()) != null)
                        map.increment(d);
 
+               emptyRow = new DblArray(new double[dictCols]);
        }
 
-       /**
-        * Create a scheme for the DDC compression given
-        * 
-        * @param g A DDC Column group
-        * @return A DDC Compression scheme
-        */
-       public static ICLAScheme create(ColGroupDDC g) {
-               if(g.getColIndices().size() == 1)
-                       return null;
-               return new DDCScheme(g);
+       protected DDCSchemeMC(IColIndex cols) {
+               super(cols);
+               final int nCol = cols.size();
+               this.map = new DblArrayCountHashMap(4, nCol);
+               this.emptyRow = new DblArray(new double[nCol]);
        }
 
        @Override
-       public AColGroup encode(MatrixBlock data) {
-               return encode(data, cols);
+       protected final Object getMap() {
+               return map;
+       }
+
+       @Override
+       public ICLAScheme update(MatrixBlock data, IColIndex columns) {
+
+               validate(data, columns);
+               final int nRow = data.getNumRows();
+               final ReaderColumnSelection reader = 
ReaderColumnSelection.createReader(//
+                       data, columns, false, 0, nRow);
+               DblArray d = null;
+               int r = 0;
+               while((d = reader.nextRow()) != null) {
+                       final int cr = reader.getCurrentRowIndex();
+                       if(cr != r) {
+                               map.increment(emptyRow, cr - r);
+                               r = cr;
+                       }
+                       map.increment(d);
+                       r++;
+               }
+
+               if(r < nRow)
+                       map.increment(emptyRow,  nRow - r - 1);
+
+               return this;
        }
 
        @Override
        public AColGroup encode(MatrixBlock data, IColIndex columns) {
-               if(columns.size() != cols.size())
-                       throw new IllegalArgumentException("Invalid columns to 
encode");
+
+               validate(data, columns);
                final int nRow = data.getNumRows();
                final ReaderColumnSelection reader = 
ReaderColumnSelection.createReader(//
                        data, columns, false, 0, nRow);
-               final AMapToData d = MapToFactory.create(nRow,nUnique);
+               final AMapToData d = MapToFactory.create(nRow, map.size());
 
                DblArray cellVals;
-               while((cellVals = reader.nextRow()) != null) {
-                       final int row = reader.getCurrentRowIndex();
-                       final int id = map.getId(cellVals);
-                       if(id == -1)
-                               return null;
-                       d.set(row, id);
-               }
+               int emptyIdx = map.getId(emptyRow);
+               if(emptyIdx == -1){
 
-               return ColGroupDDC.create(columns, dict, d, null);
+                       while((cellVals = reader.nextRow()) != null) {
+                               final int row = reader.getCurrentRowIndex();
+                               final int id = map.getId(cellVals);
+                               d.set(row, id);
+                       }
+               }
+               else{
+                       int r = 0;
+                       while((cellVals = reader.nextRow()) != null) {
+                               final int row = reader.getCurrentRowIndex();
+                               if(row != r) {
+                                       while(r < row)
+                                               d.set(r++, emptyIdx);
+                               }
+                               final int id = map.getId(cellVals);
+                               d.set(row, id);
+                               r++;
+                       }
+                       while(r < nRow)
+                               d.set(r++, emptyIdx);
+               }
+               if(lastDict == null || 
lastDict.getNumberOfValues(columns.size()) != map.size())
+                       lastDict = DictionaryFactory.create(map, 
columns.size(), false, data.getSparsity());
+               return ColGroupDDC.create(columns, lastDict, d, null);
        }
 
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCSchemeSC.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCSchemeSC.java
new file mode 100644
index 0000000000..5fde8fe42e
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCSchemeSC.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.colgroup.scheme;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+public class DDCSchemeSC extends DDCScheme {
+
+       final private DoubleCountHashMap map;
+
+       protected DDCSchemeSC(ColGroupDDC g) {
+               super(g.getColIndices());
+               if(cols.size() != 1)
+                       throw new DMLRuntimeException("Invalid single col 
scheme");
+               this.lastDict = g.getDictionary();
+               int unique = lastDict.getNumberOfValues(1);
+               map = new DoubleCountHashMap(unique);
+               for(int i = 0; i < unique; i++)
+                       map.increment(lastDict.getValue(i));
+       }
+
+       protected DDCSchemeSC(IColIndex cols) {
+               super(cols);
+               this.map = new DoubleCountHashMap(4);
+       }
+
+       @Override
+       protected final Object getMap() {
+               return map;
+       }
+
+       @Override
+       public ICLAScheme update(MatrixBlock data, IColIndex columns) {
+               validate(data, columns);
+               final int col = columns.get(0);
+               if(data.isEmpty())
+                       map.increment(0, data.getNumRows());
+               else if(data.isInSparseFormat())
+                       updateSparse(data, col);
+               else if(data.getDenseBlock().isContiguous())
+                       updateDense(data, col);
+               else
+                       updateGeneric(data, col);
+               return this;
+       }
+
+       private void updateSparse(MatrixBlock data, int col) {
+               final int nRow = data.getNumRows();
+               final SparseBlock sb = data.getSparseBlock();
+               for(int i = 0; i < nRow; i++)
+                       map.increment(sb.get(i, col));
+
+       }
+
+       private void updateDense(MatrixBlock data, int col) {
+               final int nRow = data.getNumRows();
+               final double[] vals = data.getDenseBlockValues();
+               final int nCol = data.getNumColumns();
+               final int max = nRow * nCol; // guaranteed lower than intmax.
+               for(int off = col; off < max; off += nCol)
+                       map.increment(vals[off]);
+
+       }
+
+       private void updateGeneric(MatrixBlock data, int col) {
+               final int nRow = data.getNumRows();
+               final DenseBlock db = data.getDenseBlock();
+               for(int i = 0; i < nRow; i++) {
+                       final double[] c = db.values(i);
+                       final int off = db.pos(i) + col;
+                       map.increment(c[off]);
+               }
+       }
+
+       @Override
+       public AColGroup encode(MatrixBlock data, IColIndex columns) {
+
+               validate(data, columns);
+               final int nRow = data.getNumRows();
+               final AMapToData d = MapToFactory.create(nRow, map.size());
+               encode(data, d, cols.get(0));
+               if(lastDict == null || 
lastDict.getNumberOfValues(columns.size()) != map.size())
+                       lastDict = DictionaryFactory.create(map);
+
+               return ColGroupDDC.create(columns, lastDict, d, null);
+       }
+
+       private void encode(MatrixBlock data, AMapToData d, int col) {
+               if(data.isEmpty())
+                       d.fill(map.getId(0.0));
+               else if(data.isInSparseFormat())
+                       encodeSparse(data, d, col);
+               else if(data.getDenseBlock().isContiguous())
+                       encodeDense(data, d, col);
+               else
+                       encodeGeneric(data, d, col);
+       }
+
+       private void encodeSparse(MatrixBlock data, AMapToData d, int col) {
+               final int nRow = data.getNumRows();
+               final SparseBlock sb = data.getSparseBlock();
+               for(int i = 0; i < nRow; i++)
+                       d.set(i, map.getId(sb.get(i, col)));
+
+       }
+
+       private void encodeDense(MatrixBlock data, AMapToData d, int col) {
+               final int nRow = data.getNumRows();
+               final double[] vals = data.getDenseBlockValues();
+               final int nCol = data.getNumColumns();
+               final int max = nRow * nCol; // guaranteed lower than intmax.
+               for(int i = 0, off = col; off < max; i++, off += nCol)
+                       d.set(i, map.getId(vals[off]));
+
+       }
+
+       private void encodeGeneric(MatrixBlock data, AMapToData d, int col) {
+               final int nRow = data.getNumRows();
+               final DenseBlock db = data.getDenseBlock();
+               for(int i = 0; i < nRow; i++) {
+                       final double[] c = db.values(i);
+                       final int off = db.pos(i) + col;
+                       d.set(i, map.getId(c[off]));
+               }
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/EmptyScheme.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/EmptyScheme.java
index f85e85cf69..5015b54001 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/EmptyScheme.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/EmptyScheme.java
@@ -19,18 +19,16 @@
 
 package org.apache.sysds.runtime.compress.colgroup.scheme;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
 import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
-import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-public class EmptyScheme implements ICLAScheme {
-       /** The instance of a empty column group that in all cases here would 
be returned to be the same */
-       final ColGroupEmpty g;
+public class EmptyScheme extends ACLAScheme {
 
        protected EmptyScheme(ColGroupEmpty g) {
-               this.g = g;
+               super(g.getColIndices());
        }
 
        public static EmptyScheme create(ColGroupEmpty g) {
@@ -38,78 +36,13 @@ public class EmptyScheme implements ICLAScheme {
        }
 
        @Override
-       public AColGroup encode(MatrixBlock data) {
-               return encode(data, g.getColIndices());
+       public ICLAScheme update(MatrixBlock data, IColIndex columns) {
+               throw new NotImplementedException();
        }
 
        @Override
-       public AColGroup encode(MatrixBlock data, IColIndex  columns) {
-
-               if(columns.size() != g.getColIndices().size())
-                       throw new IllegalArgumentException("Invalid columns to 
encode");
-               final int nCol = data.getNumColumns();
-               final int nRow = data.getNumRows();
-               if(nCol < columns.get(columns.size() - 1)) {
-                       LOG.warn("Invalid to encode matrix with less columns 
than encode scheme max column");
-                       return null;
-               }
-               else if(data.isEmpty()) 
-                       return returnG(columns);
-               else if(data.isInSparseFormat())
-                       return encodeSparse(data, columns, nRow, nCol);
-               else if(data.getDenseBlock().isContiguous())
-                       return encodeDense(data, columns, nRow, nCol);
-               else
-                       return encodeGeneric(data, columns, nRow, nCol);
-       }
-
-       private AColGroup encodeDense(final MatrixBlock data, final IColIndex  
cols, final int nRow, final int nCol) {
-               final double[] dv = data.getDenseBlockValues();
-               for(int r = 0; r < nRow; r++) {
-                       final int off = r * nCol;
-                       for(int ci = 0; ci < cols.size(); ci++)
-                               if(dv[off + cols.get(ci)] != 0.0)
-                                       return null;
-               }
-               return g;
-       }
-
-       private AColGroup encodeSparse(final MatrixBlock data, final IColIndex  
cols, final int nRow, final int nCol) {
-               SparseBlock sb = data.getSparseBlock();
-               for(int r = 0; r < nRow; r++) {
-                       if(sb.isEmpty(r))
-                               continue; // great!
-
-                       final int apos = sb.pos(r);
-                       final int alen = apos + sb.size(r);
-                       final int[] aix = sb.indexes(r);
-                       int p = 0; // pointer into cols;
-                       for(int j = apos; j < alen ; j++) {
-                               while(p < cols.size() && cols.get(p) < aix[j])
-                                       p++;
-                               if(p < cols.size() && aix[j] == cols.get(p))
-                                       return null;
-
-                               if(p >= cols.size())
-                                       continue;
-                       }
-               }
-               return returnG(cols);
-       }
-
-       private AColGroup encodeGeneric(final MatrixBlock data, final IColIndex 
 cols, final int nRow, final int nCol) {
-               for(int r = 0; r < nRow; r++)
-                       for(int ci = 0; ci < cols.size(); ci++)
-                               if(data.quickGetValue(r, cols.get(ci)) != 0.0)
-                                       return null;
-               return returnG(cols);
+       public AColGroup encode(MatrixBlock data, IColIndex columns) {
+               validate(data, columns);
+               return new ColGroupEmpty(columns);
        }
-
-       private AColGroup returnG(IColIndex columns) {
-               if(columns == g.getColIndices())
-                       return g; // great!
-               else
-                       return new ColGroupEmpty(columns);
-       }
-
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ICLAScheme.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ICLAScheme.java
index 0c73c4940c..437e674287 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ICLAScheme.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/ICLAScheme.java
@@ -26,7 +26,7 @@ import 
org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
- * Abstract class for a scheme instance.
+ * Interface for a scheme instance.
  * 
  * Instances of this class has the purpose of encoding the minimum values 
required to reproduce a compression scheme,
  * and apply it to unseen data.
@@ -43,24 +43,49 @@ public interface ICLAScheme {
 
        /**
         * Encode the given matrix block into the scheme provided in the 
instance.
+        *       
         * 
-        * The method returns null, if it is impossible to encode the input 
data with the given scheme.
+        * The method is unsafe in the sense that if the encoding scheme does 
not fit, there is no guarantee that an error is
+        * thrown. To guarantee the encoding scheme, first use update on the 
matrix block and used the returned scheme to
+        * ensure consistency.
         * 
         * @param data The data to encode
-        * @return A compressed column group or null.
+        * @throws IllegalArgumentException In the case the columns argument 
number of columns does not corelate with the
+        *                                  schemes list of columns.
+        * @return A compressed column group forced to use the scheme provided.
         */
        public AColGroup encode(MatrixBlock data);
 
        /**
         * Encode a given matrix block into the scheme provided in the instance 
but overwrite what columns to use.
         * 
-        * The method returns null, if it is impossible to encode the input 
data with the given scheme.
+        * The method is unsafe in the sense that if the encoding scheme does 
not fit, there is no guarantee that an error is
+        * thrown. To guarantee the encoding scheme, first use update on the 
matrix block and used the returned scheme to
+        * ensure consistency.
         * 
         * @param data    The data to encode
         * @param columns The columns to apply the scheme to, but must be of 
same number than the encoded scheme
-        * @throws IllegalArgumentException In the case the columns argument 
number of columns doesent corelate with the
-        *                                  Schemes list of columns.
-        * @return A compressed column group or null
+        * @throws IllegalArgumentException In the case the columns argument 
number of columns does not corelate with the
+        *                                  schemes list of columns.
+        * @return A compressed column group forced to use the scheme provided.
         */
        public AColGroup encode(MatrixBlock data, IColIndex columns);
+
+       /**
+        * Update the encoding scheme to enable compression of the given data.
+        * 
+        * @param data The data to update into the scheme
+        * @return A updated scheme 
+        */
+       public ICLAScheme update(MatrixBlock data);
+
+       /**
+        * Update the encoding scheme to enable compression of the given data.
+        * 
+        * @param data    The data to update into the scheme
+        * @param columns The columns to extract the data from
+        * @return A updated scheme 
+        */
+       public ICLAScheme update(MatrixBlock data, IColIndex columns);
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/SchemeFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/SchemeFactory.java
new file mode 100644
index 0000000000..9e98818714
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/SchemeFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.colgroup.scheme;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+
+public class SchemeFactory {
+       public static ICLAScheme create(IColIndex columns, CompressionType 
type) {
+               switch(type) {
+                       case CONST:
+                               return ConstScheme.create(columns);
+                       case DDC:
+                               return DDCScheme.create(columns);
+                       case DDCFOR:
+                               break;
+                       case DeltaDDC:
+                               break;
+                       case EMPTY:
+                               break;
+                       case LinearFunctional:
+                               break;
+                       case OLE:
+                               break;
+                       case RLE:
+                               break;
+                       case SDC:
+                               break;
+                       case SDCFOR:
+                               break;
+                       case UNCOMPRESSED:
+                               break;
+                       default:
+                               break;
+               }
+               throw new NotImplementedException("Not Implemented scheme for 
plan of type: " + type);
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java
index bac8374b96..22efcbafd1 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java
@@ -74,9 +74,10 @@ public final class ReaderCompressed extends MatrixReader {
                throws IOException {
 
                final Map<MatrixIndexes, MatrixBlock> data = new HashMap<>();
-
-               for(Path subPath : IOUtilFunctions.getSequenceFilePaths(fs, 
path))
+               
+               for(Path subPath : IOUtilFunctions.getSequenceFilePaths(fs, 
path)){
                        read(subPath, job, data);
+               }
 
                if(data.size() == 1)
                        return data.entrySet().iterator().next().getValue();
@@ -94,9 +95,9 @@ public final class ReaderCompressed extends MatrixReader {
                        MatrixIndexes key = new MatrixIndexes();
                        CompressedWriteBlock value = new CompressedWriteBlock();
 
+                       
                        while(reader.next(key, value)) {
                                final MatrixBlock g = value.get();
-
                                if(g instanceof CompressedMatrixBlock)
                                        data.put(key, g);
                                else if(g.isEmpty())
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
index 0c6deecbd2..8c692fe158 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
@@ -20,7 +20,11 @@
 package org.apache.sysds.runtime.compress.io;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +36,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
@@ -46,6 +51,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.runtime.util.HDFSTool;
 
 public final class WriterCompressed extends MatrixWriter {
@@ -116,72 +122,143 @@ public final class WriterCompressed extends MatrixWriter 
{
 
        private void write(MatrixBlock src, final String fname, final int blen) 
throws IOException {
                final int k = OptimizerUtils.getParallelTextWriteParallelism();
-               final Path path = new Path(fname);
-               final JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               // Delete previous
-               HDFSTool.deleteFileIfExistOnHDFS(path, job);
-
-               // Make Writer (New interface)
-               final Writer w = SequenceFile.createWriter(job, 
Writer.file(path), Writer.bufferSize(4096),
-                       Writer.keyClass(MatrixIndexes.class), 
Writer.valueClass(CompressedWriteBlock.class),
-                       Writer.compression(SequenceFile.CompressionType.NONE), 
// No Compression type on disk
-                       Writer.replication((short) 1));
-
                final int rlen = src.getNumRows();
                final int clen = src.getNumColumns();
-
                // Try to compress!
                if(!(src instanceof CompressedMatrixBlock))
                        src = CompressedMatrixBlockFactory.compress(src, 
k).getLeft();
 
                if(rlen <= blen && clen <= blen)
-                       writeSingleBlock(w, src, k);
+                       writeSingleBlock(fname, src, k);
+               else if(!(src instanceof CompressedMatrixBlock))
+                       writeMultiBlockUncompressed(fname, src, rlen, clen, 
blen, k);
                else
-                       writeMultiBlock(w, src, rlen, clen, blen, k);
-
-               IOUtilFunctions.closeSilently(w);
-
-               // Cleanup
-               final FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
+                       writeMultiBlockCompressed(fname, src, rlen, clen, blen, 
k);
+               cleanup(fname);
        }
 
-       private void writeSingleBlock(Writer w, MatrixBlock b, int k) throws 
IOException {
+       private void writeSingleBlock(String fname, MatrixBlock b, int k) 
throws IOException {
+               Writer w = getWriter(fname);
                MatrixIndexes idx = new MatrixIndexes(1, 1);
                MatrixBlock mc = CompressedMatrixBlockFactory.compress(b, 
k).getLeft();
                w.append(idx, new CompressedWriteBlock(mc));
+               IOUtilFunctions.closeSilently(w);
        }
 
-       private void writeMultiBlock(Writer w, MatrixBlock b, final int rlen, 
final int clen, final int blen, int k)
-               throws IOException {
+       private void writeMultiBlockUncompressed(String fname, MatrixBlock b, 
final int rlen, final int clen, final int blen,
+               int k) throws IOException {
+               Writer w = getWriter(fname);
                final MatrixIndexes indexes = new MatrixIndexes();
-               if(!(b instanceof CompressedMatrixBlock))
-                       LOG.warn("Writing compressed format with non identical 
compression scheme");
+               LOG.warn("Writing compressed format with non identical 
compression scheme");
 
                for(int bc = 0; bc * blen < clen; bc++) {
                        final int sC = bc * blen;
                        final int mC = Math.min(sC + blen, clen) - 1;
-                       if(b instanceof CompressedMatrixBlock) {
-                               final CompressedMatrixBlock mc = // mC == clen 
- 1 ? (CompressedMatrixBlock) b :
-                                       
CLALibSlice.sliceColumns((CompressedMatrixBlock) b, sC, mC); // slice columns!
+                       for(int br = 0; br * blen < rlen; br++) {
+                               // Max Row and col in block
+                               final int sR = br * blen;
+                               final int mR = Math.min(sR + blen, rlen) - 1;
+                               MatrixBlock mb = b.slice(sR, mR, sC, mC);
+                               MatrixBlock mc = 
CompressedMatrixBlockFactory.compress(mb, k).getLeft();
+                               indexes.setIndexes(br + 1, bc + 1);
+                               w.append(indexes, new CompressedWriteBlock(mc));
+                       }
+
+               }
+
+               IOUtilFunctions.closeSilently(w);
+       }
+
+       private void writeMultiBlockCompressed(String fname, MatrixBlock b, 
final int rlen, final int clen, final int blen,
+               int k) throws IOException {
 
+               setupWrite(fname);
+               final ExecutorService pool = CommonThreadPool.get(k);
+               final ArrayList<WriteTask> tasks = new ArrayList<>();
+               try {
+                       int i = 0;
+                       for(int bc = 0; bc * blen < clen; bc++) {// column 
blocks
+                               final int sC = bc * blen;
+                               final int mC = Math.min(sC + blen, clen) - 1;
+                               final CompressedMatrixBlock mc = 
CLALibSlice.sliceColumns((CompressedMatrixBlock) b, sC, mC);
                                final List<MatrixBlock> blocks = 
CLALibSlice.sliceBlocks(mc, blen); // Slice compressed blocks
-                               for(int br = 0; br * blen < rlen; br++) {
-                                       indexes.setIndexes(br + 1, bc + 1);
-                                       w.append(indexes, new 
CompressedWriteBlock(blocks.get(br)));
+                               final int blocksPerThread = Math.max(1, 
blocks.size() / k);
+                               for(int block = 0; block < blocks.size(); block 
+= blocksPerThread, i++) {
+                                       final Path newPath = new Path(fname, 
IOUtilFunctions.getPartFileName(i));
+                                       tasks.add(new WriteTask(newPath, 
blocks, bc, block, Math.min(blocks.size(), block + blocksPerThread)));
                                }
                        }
-                       else {
-                               for(int br = 0; br * blen < rlen; br++) {
-                                       // Max Row and col in block
-                                       final int sR = br * blen;
-                                       final int mR = Math.min(sR + blen, 
rlen) - 1;
-                                       MatrixBlock mb = b.slice(sR, mR, sC, 
mC);
-                                       MatrixBlock mc = 
CompressedMatrixBlockFactory.compress(mb, k).getLeft();
-                                       indexes.setIndexes(br + 1, bc + 1);
-                                       w.append(indexes, new 
CompressedWriteBlock(mc));
-                               }
+                       for(Future<Object> f : pool.invokeAll(tasks))
+                               f.get();
+                       pool.shutdown();
+               }
+               catch(Exception e) {
+                       pool.shutdown();
+                       throw new IOException("Failed writing compressed multi 
block", e);
+               }
+
+       }
+
+       private static void setupWrite(String fname) throws IOException {
+               final Path path = new Path(fname);
+               final JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               HDFSTool.deleteFileIfExistOnHDFS(path, job);
+               HDFSTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+       }
+
+       private static Writer getWriter(String fname) throws IOException {
+               final Path path = new Path(fname);
+               return getWriter(path);
+       }
+
+       private static Writer getWriter(Path path) throws IOException {
+               final JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               HDFSTool.deleteFileIfExistOnHDFS(path, job);
+               return SequenceFile.createWriter(job, Writer.file(path), 
Writer.bufferSize(4096),
+                       Writer.keyClass(MatrixIndexes.class), 
Writer.valueClass(CompressedWriteBlock.class),
+                       Writer.compression(SequenceFile.CompressionType.NONE), 
// No Compression type on disk
+                       Writer.replication((short) 1));
+       }
+
+       private static void cleanup(String fname) throws IOException {
+               final Path path = new Path(fname);
+               cleanup(path);
+       }
+
+       private static void cleanup(Path path) throws IOException {
+               final JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               final FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
+       }
+
+       private static class WriteTask implements Callable<Object> {
+               final Path path;
+               final List<MatrixBlock> blocks;
+               final int bc;
+               final int bl;
+               final int bu;
+
+               private WriteTask(Path path, List<MatrixBlock> blocks, int bc, 
int bl, int bu) {
+                       this.path = path;
+                       this.blocks = blocks;
+                       // +1 for one indexed
+                       this.bl = bl + 1;
+                       this.bu = bu + 1;
+                       this.bc = bc + 1;
+               }
+
+               @Override
+               public Object call() throws Exception {
+                       final Writer w = getWriter(path);
+                       for(int b = bl; b < bu; b++) {
+                               MatrixIndexes index = new MatrixIndexes(b, bc);
+                               CompressedWriteBlock blk = new 
CompressedWriteBlock(blocks.get(b - 1));
+                               w.append(index, blk);
                        }
+                       IOUtilFunctions.closeSilently(w);
+                       cleanup(path);
+                       return null;
                }
        }
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java
index 36fd4e3299..91238a2927 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java
@@ -43,32 +43,54 @@ public class CLALibCombine {
 
        protected static final Log LOG = 
LogFactory.getLog(CLALibCombine.class.getName());
 
+       /**
+        * Combine the map of index matrix blocks into a single MatrixBlock.
+        * 
+        * The intension is that the combining is able to resolve differences 
in the different MatrixBlocks allocation.
+        * 
+        * @param m The map of Index to MatrixBLocks
+        * @param k The parallelization degree allowed for this operation
+        * @return The combined matrix.
+        */
        public static MatrixBlock combine(Map<MatrixIndexes, MatrixBlock> m, 
int k) {
-               // Dynamically find rlen, clen and blen;
-               // assume that the blen is the same in all blocks.
-               // assume that all blocks are there ...
                final MatrixIndexes lookup = new MatrixIndexes(1, 1);
                MatrixBlock b = m.get(lookup);
+               if(b == null)
+                       throw new DMLCompressionException("Invalid map to 
combine does not contain the top left map MatrixBlock");
                final int blen = Math.max(b.getNumColumns(), b.getNumRows());
 
+               // Dynamically find rlen, clen and blen;
+               final long rows = findRLength(m, b);
+               // TODO utilize the known size of m to extrapolate how many row 
blocks there are
+               // and only use the last rowblock to calculate the total number 
of rows.
+               final long cols = findCLength(m, b);
+
+               return combine(m, lookup, (int) rows, (int) cols, blen, k);
+       }
+
+       public static MatrixBlock combine(Map<MatrixIndexes, MatrixBlock> m, 
int rlen, int clen, int blen, int k) {
+               final MatrixIndexes lookup = new MatrixIndexes();
+               return combine(m, lookup, rlen, clen, blen, k);
+       }
+
+       private static long findRLength(Map<MatrixIndexes, MatrixBlock> m, 
MatrixBlock b) {
+               final MatrixIndexes lookup = new MatrixIndexes(1, 1);
                long rows = 0;
                while((b = m.get(lookup)) != null) {
                        rows += b.getNumRows();
                        lookup.setIndexes(lookup.getRowIndex() + 1, 1);
                }
-               lookup.setIndexes(1, 1);
+               return rows;
+       }
+
+       private static long findCLength(Map<MatrixIndexes, MatrixBlock> m, 
MatrixBlock b) {
+               final MatrixIndexes lookup = new MatrixIndexes(1, 1);
                long cols = 0;
                while((b = m.get(lookup)) != null) {
                        cols += b.getNumColumns();
                        lookup.setIndexes(1, lookup.getColumnIndex() + 1);
                }
-
-               return combine(m, lookup, (int) rows, (int) cols, blen, k);
-       }
-
-       public static MatrixBlock combine(Map<MatrixIndexes, MatrixBlock> m, 
int rlen, int clen, int blen, int k) {
-               final MatrixIndexes lookup = new MatrixIndexes();
-               return combine(m, lookup, rlen, clen, blen, k);
+               return cols;
        }
 
        private static MatrixBlock combine(final Map<MatrixIndexes, 
MatrixBlock> m, final MatrixIndexes lookup,
@@ -95,13 +117,13 @@ public class CLALibCombine {
                        final List<AColGroup> gs = cmb.getColGroups();
                        final int off = bc * blen;
                        for(AColGroup g : gs) {
-                               try{
+                               try {
                                        final IIterate cols = 
g.getColIndices().iterator();
                                        final CompressionType t = 
g.getCompType();
                                        while(cols.hasNext())
                                                colTypes[cols.next() + off] = t;
                                }
-                               catch(Exception e){
+                               catch(Exception e) {
                                        throw new 
DMLCompressionException("Failed combining: " + g.toString());
                                }
                        }
@@ -174,7 +196,7 @@ public class CLALibCombine {
                                final CompressedMatrixBlock cmb = 
(CompressedMatrixBlock) m.get(lookup);
                                for(AColGroup g : cmb.getColGroups()) {
                                        final AColGroup gc = bc > 0 ? 
g.shiftColIndices(bc * blen) : g;
-                                       final int c  = 
gc.getColIndices().get(0);
+                                       final int c = gc.getColIndices().get(0);
                                        if(br == 0)
                                                finalCols[c] = new 
AColGroup[blocksInColumn];
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/plan/CompressionPlanFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/plan/CompressionPlanFactory.java
new file mode 100644
index 0000000000..e7c9e03df6
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/plan/CompressionPlanFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.plan;
+
+import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.indexes.SingleIndex;
+import org.apache.sysds.runtime.compress.colgroup.indexes.TwoIndex;
+import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
+import org.apache.sysds.runtime.compress.colgroup.scheme.SchemeFactory;
+
+public class CompressionPlanFactory {
+
+       public static IPlanEncode singleCols(int nCol, CompressionType type, 
int k) {
+               ICLAScheme[] schemes = new ICLAScheme[nCol];
+               for(int i = 0; i < nCol; i++)
+                       schemes[i] = SchemeFactory.create(new SingleIndex(i), 
type);
+               return new NaivePlanEncode(schemes, k, false);
+       }
+
+       public static IPlanEncode twoCols(int nCol, CompressionType type, int 
k) {
+               ICLAScheme[] schemes = new ICLAScheme[nCol / 2 + nCol % 2];
+               for(int i = 0; i < nCol; i += 2) {
+                       schemes[i/2] = i + 1 >= nCol ? //
+                               SchemeFactory.create(new SingleIndex(i), type) 
: //
+                               SchemeFactory.create(new TwoIndex(i, i + 1), 
type);
+               }
+               return new NaivePlanEncode(schemes, k, false);
+       }
+
+       public static IPlanEncode nCols(int nCol, int n, CompressionType type, 
int k){
+               ICLAScheme[] schemes = new ICLAScheme[nCol / n + (nCol % n  != 
0 ? 1 : 0)];
+               for(int i = 0; i < nCol; i += n) {
+                       schemes[i/n] = i + n < nCol ? //
+                               SchemeFactory.create( ColIndexFactory.create(i, 
i+n), type) : //
+                               SchemeFactory.create(ColIndexFactory.create(i, 
nCol), type);
+               }
+               return new NaivePlanEncode(schemes, k, false);
+       }
+
+       public static IPlanEncode create(IColIndex[] columnGroups, 
CompressionType[] type, int k) {
+               ICLAScheme[] schemes = new ICLAScheme[columnGroups.length];
+               for(int i = 0; i < columnGroups.length; i++)
+                       schemes[i] = SchemeFactory.create(columnGroups[i], 
type[i]);
+
+               // TODO check for overlapping
+               return new NaivePlanEncode(schemes, k, false);
+       }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/DCounts.java 
b/src/main/java/org/apache/sysds/runtime/compress/plan/IPlanEncode.java
similarity index 51%
rename from src/main/java/org/apache/sysds/runtime/compress/utils/DCounts.java
rename to src/main/java/org/apache/sysds/runtime/compress/plan/IPlanEncode.java
index 316dd30ed3..e9421974f9 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/DCounts.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/plan/IPlanEncode.java
@@ -16,25 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sysds.runtime.compress.utils;
 
-public class DCounts {
-       final public double key;
-       public int count;
-       public int id;
+package org.apache.sysds.runtime.compress.plan;
 
-       public DCounts(double key, int id) {
-               this.key = key;
-               this.id = id;
-               count = 1;
-       }
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-       public void inc() {
-               count++;
-       }
+/**
+ * Interface for constructing an compression plan that is able to compress 
sequences of matrix blocks with the same
+ * scheme
+ */
+public interface IPlanEncode {
+
+       /**
+        * Encode a given matrix block subject to the plan given, while 
erroring out in case of breaking compression
+        * 
+        * @param in The matrix block to encode
+        * @return A Compressed Matrix block with the safe scheme applied.
+        */
+       public CompressedMatrixBlock encode(MatrixBlock in);
+
+       /**
+        * Given a block expand the plan to enable encoding of the given block.
+        * 
+        * @param in The bock
+        */
+       public void expandPlan(MatrixBlock in);
 
-       @Override
-       public String toString() {
-               return "[" + key + ", " + count + "]";
-       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/plan/NaivePlanEncode.java 
b/src/main/java/org/apache/sysds/runtime/compress/plan/NaivePlanEncode.java
new file mode 100644
index 0000000000..ea7ffc9a56
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/plan/NaivePlanEncode.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+
+/**
+ * Naive implementation of encoding based on a plan. This does not reuse plans 
across groups, and does not smartly
+ * extract encodings.
+ */
+public class NaivePlanEncode implements IPlanEncode {
+
+       /** The schemes to apply to the input. */
+       private final ICLAScheme[] schemes;
+       /** The parallelization degree to use in this encoder. */
+       private final int k;
+       /** If the schemes are overlapping */
+       private final boolean overlapping;
+
+       public NaivePlanEncode(ICLAScheme[] schemes, int k, boolean 
overlapping) {
+               this.schemes = schemes;
+               this.k = k;
+               this.overlapping = overlapping;
+       }
+
+       @Override
+       public CompressedMatrixBlock encode(MatrixBlock in) {
+               try {
+                       final List<AColGroup> groups = k <= 1 ? 
encodeSingleThread(in) : encodeMultiThread(in);
+                       return new CompressedMatrixBlock(in.getNumRows(), 
in.getNumColumns(), in.getNonZeros(), overlapping, groups);
+               }
+               catch(Exception e) {
+                       throw new DMLCompressionException("Failed encoding 
matrix", e);
+               }
+       }
+
+       private List<AColGroup> encodeSingleThread(MatrixBlock in) {
+               List<AColGroup> groups = new ArrayList<>(schemes.length);
+               for(int i = 0; i < schemes.length; i++)
+                       groups.add(schemes[i].encode(in));
+               return groups;
+       }
+
+       private List<AColGroup> encodeMultiThread(MatrixBlock in) throws 
Exception {
+               final ExecutorService pool = CommonThreadPool.get(k);
+               try {
+
+                       List<EncodeTask> t = new ArrayList<>(schemes.length);
+                       for(int i = 0; i < schemes.length; i++)
+                               t.add(new EncodeTask(in, schemes[i]));
+
+                       List<AColGroup> groups = new 
ArrayList<>(schemes.length);
+                       for(Future<AColGroup> f : pool.invokeAll(t))
+                               groups.add(f.get());
+                       return groups;
+               }
+               finally {
+                       pool.shutdown();
+               }
+       }
+
+       @Override
+       public void expandPlan(MatrixBlock in) {
+               try {
+                       if(k <= 1)
+                               expandPlanSingleThread(in);
+                       else
+                               expandPlanMultiThread(in);
+               }
+               catch(Exception e) {
+                       throw new DMLCompressionException("Failed expanding 
plan", e);
+               }
+       }
+
+       public void expandPlanSingleThread(MatrixBlock in) {
+
+               for(int i = 0; i < schemes.length; i++)
+                       schemes[i] = schemes[i].update(in);
+       }
+
+       public void expandPlanMultiThread(MatrixBlock in) throws Exception {
+               final ExecutorService pool = CommonThreadPool.get(k);
+               try {
+                       List<ExpandTask> t = new ArrayList<>(schemes.length);
+                       for(int i = 0; i < schemes.length; i++)
+                               t.add(new ExpandTask(in, schemes[i]));
+                       int i = 0;
+                       for(Future<ICLAScheme> f : pool.invokeAll(t))
+                               schemes[i++] = f.get();
+
+               }
+               finally {
+                       pool.shutdown();
+               }
+
+       }
+
+       @Override
+       public final String toString() {
+               StringBuilder sb = new StringBuilder();
+               sb.append(this.getClass().getSimpleName());
+               sb.append(" Parallelization: " + k);
+               sb.append(" Overlapping: " + overlapping);
+               sb.append("\n");
+               for(int i = 0; i < schemes.length; i++) {
+                       sb.append(schemes[i]);
+                       sb.append("\n");
+               }
+               return sb.toString();
+       }
+
+       private static class EncodeTask implements Callable<AColGroup> {
+               private final MatrixBlock in;
+               private final ICLAScheme sc;
+
+               protected EncodeTask(MatrixBlock in, ICLAScheme sc) {
+                       this.in = in;
+                       this.sc = sc;
+               }
+
+               @Override
+               public AColGroup call() throws Exception {
+                       try {
+                               return sc.encode(in);
+                       }
+                       catch(Exception e) {
+                               throw new DMLCompressionException("Failed 
encoding schema");
+                       }
+               }
+       }
+
+       private static class ExpandTask implements Callable<ICLAScheme> {
+               private final MatrixBlock in;
+               private final ICLAScheme sc;
+
+               protected ExpandTask(MatrixBlock in, ICLAScheme sc) {
+                       this.in = in;
+                       this.sc = sc;
+               }
+
+               @Override
+               public ICLAScheme call() throws Exception {
+                       try {
+
+                               return sc.update(in);
+                       }
+                       catch(Exception e) {
+                               throw new DMLCompressionException("Failed 
Expanding schema");
+                       }
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/readers/ReaderColumnSelection.java
 
b/src/main/java/org/apache/sysds/runtime/compress/readers/ReaderColumnSelection.java
index 8f5661ee93..e3dff6e86d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/readers/ReaderColumnSelection.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/readers/ReaderColumnSelection.java
@@ -46,8 +46,14 @@ public abstract class ReaderColumnSelection {
                _colIndexes = colIndexes;
                _rl = rl;
                _ru = ru;
-               reusableArr = new double[colIndexes.size()];
-               reusableReturn = new DblArray(reusableArr);
+               if(colIndexes != null) {
+                       reusableArr = new double[colIndexes.size()];
+                       reusableReturn = new DblArray(reusableArr);
+               }
+               else {
+                       reusableArr = null;
+                       reusableReturn = null;
+               }
        }
 
        /**
@@ -105,8 +111,6 @@ public abstract class ReaderColumnSelection {
                if(colIndices.size() <= 1)
                        throw new DMLCompressionException(
                                "Column selection reader should not be done on 
single column groups: " + colIndices);
-               else if(rawBlock.getSparseBlock() == null && 
rawBlock.getDenseBlock() == null)
-                       throw new DMLCompressionException("Input Block was 
null");
                else if(rl >= ru)
                        throw new DMLCompressionException("Invalid inverse 
range for reader " + rl + " to " + ru);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/utils/DArrCounts.java 
b/src/main/java/org/apache/sysds/runtime/compress/utils/ACount.java
similarity index 57%
rename from 
src/main/java/org/apache/sysds/runtime/compress/utils/DArrCounts.java
rename to src/main/java/org/apache/sysds/runtime/compress/utils/ACount.java
index 2c696e6c5c..49949c6e32 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/DArrCounts.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/ACount.java
@@ -16,25 +16,54 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.sysds.runtime.compress.utils;
 
-public class DArrCounts {
-       public final DblArray key;
+public abstract class ACount {
        public int count;
        public int id;
 
-       public DArrCounts(DblArray key, int id) {
-               this.key = key;
-               this.id = id;
-               count = 1;
+       protected abstract Object K();
+
+       @Override
+       public final String toString() {
+               return "[K:" + K() + ", <ID:" + id + ",C:" + count + ">]";
        }
 
-       public void inc() {
-               count++;
+       public static class DArrCounts extends ACount {
+               public final DblArray key;
+
+               public DArrCounts(DblArray key, int id) {
+                       this.key = key;
+                       this.id = id;
+                       count = 1;
+               }
+
+               public void inc() {
+                       count++;
+               }
+
+               protected Object K() {
+                       return key;
+               }
        }
 
-       @Override
-       public String toString() {
-               return "[" + key + ", " + count + "]";
+       public static class DCounts extends ACount {
+               final public double key;
+
+               public DCounts(double key, int id) {
+                       this.key = key;
+                       this.id = id;
+                       count = 1;
+               }
+
+               public void inc() {
+                       count++;
+               }
+
+               protected Object K() {
+                       return key;
+               }
+
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayCountHashMap.java
 
b/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayCountHashMap.java
index 9bd033cbfd..29694e18b3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayCountHashMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayCountHashMap.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.utils.ACount.DArrCounts;
 
 public class DblArrayCountHashMap {
 
@@ -88,6 +89,23 @@ public class DblArrayCountHashMap {
                }
        }
 
+       public int increment(DblArray key, int count) {
+               final int hash = key.hashCode();
+               final int ix = indexFor(hash, _data.length);
+
+               Bucket l = _data[ix];
+               while(true) {
+                       if(l == null)
+                               return addNewBucket(ix, key);
+                       else if(l.v.key.equals(key)) {
+                               l.v.count += count;
+                               return l.v.id;
+                       }
+                       else
+                               l = l.n;
+               }
+       }
+
        private synchronized int addNewBucket(final int ix, final DblArray key) 
{
                Bucket ob = _data[ix];
                _data[ix] = new Bucket(new DArrCounts(new DblArray(key), 
_size));
@@ -234,9 +252,18 @@ public class DblArrayCountHashMap {
        public String toString() {
                StringBuilder sb = new StringBuilder();
                sb.append(this.getClass().getSimpleName());
-               for(int i = 0; i < _data.length; i++)
-                       if(_data[i] != null)
-                               sb.append(", " + _data[i]);
+               if(_size == 0) {
+                       sb.append(" []");
+                       return sb.toString();
+               }
+               sb.append(" [");
+               for(int i = 1; i < _data.length; i++)
+                       if(_data[i] != null) {
+                               sb.append(_data[i]);
+                               sb.append(", ");
+                       }
+               sb.setLength(sb.length() - 2);
+               sb.append("]");
                return sb.toString();
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleCountHashMap.java 
b/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleCountHashMap.java
index 15b0f8ebb9..c496b447f4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleCountHashMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleCountHashMap.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.utils.ACount.DCounts;
 
 public class DoubleCountHashMap {
 
@@ -119,6 +120,27 @@ public class DoubleCountHashMap {
                }
        }
 
+               /**
+        * Get the ID behind the key, if it does not exist -1 is returned.
+        * 
+        * @param key The key array
+        * @return The Id or -1
+        */
+       public int getId(double key) {
+               try{
+                       int ix = hashIndex(key);
+                       Bucket l = _data[ix];
+                       while(!(l.v.key == key))
+                               l = l.n;
+       
+                       return l.v.id;
+               } catch( Exception e){
+                       if(Double.isNaN(key))
+                               return get(0.0);
+                       throw e;
+               }
+       }
+
        public int getOrDefault(double key, int def) {
                int ix = hashIndex(key);
                Bucket l = _data[ix];
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java
index c66a5083b0..e483bbd6ff 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java
@@ -115,7 +115,6 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
                        _rl = rl;
                        _ru = ru;
                        _blen = blen;
-                       _blen = blen;
                }
        
                @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java
index 32690484d8..1605e08e7d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java
@@ -146,18 +146,12 @@ public class CompressedEncode {
                Array<?> a = in.getColumn(colId - 1);
                HashMap<?, Long> map = a.getRecodeMap();
                int domain = map.size();
-
                IColIndex colIndexes = ColIndexFactory.create(0, domain);
-
                ADictionary d = new IdentityDictionary(colIndexes.size());
-
                AMapToData m = createMappingAMapToData(a, map);
-
                List<ColumnEncoder> r = c.getEncoders();
                r.set(0, new ColumnEncoderRecode(colId, (HashMap<Object, Long>) 
map));
-
                return ColGroupDDC.create(colIndexes, d, m, null);
-
        }
 
        @SuppressWarnings("unchecked")
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java
index f571809e7c..31b78c959d 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java
@@ -47,23 +47,7 @@ public class CLAConstSchemeTest {
                assertTrue(sh != null);
        }
 
-       @Test
-       public void testToSmallMatrix() {
-               assertTrue(sh.encode(new MatrixBlock(1, 3, new double[] {//
-                       1.1, 1.2, 1.3})) == null);
-       }
-
-       @Test
-       public void testWrongValuesSingleRow() {
-               assertTrue(sh.encode(new MatrixBlock(1, 5, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.2})) == null);
-       }
 
-       @Test
-       public void testWrongValuesSingleRowV2() {
-               assertTrue(sh.encode(new MatrixBlock(1, 5, new double[] {//
-                       0.0, 1.0, 0.2, 1.2, 0.2, 1.3})) == null);
-       }
 
        @Test
        public void testValidEncodeSingleRow() {
@@ -107,25 +91,8 @@ public class CLAConstSchemeTest {
                })) != null);
        }
 
-       @Test
-       public void testInvalidEncodeValueMultiRowMultiError() {
-               assertTrue(sh.encode(new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.4, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               })) == null);
-       }
 
-       @Test
-       public void testInvalidEncodeMultiRow() {
-               assertTrue(sh.encode(new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.3, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.4, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               })) == null);
-       }
+
 
        @Test
        public void testEncodeOtherColumns() {
@@ -134,20 +101,10 @@ public class CLAConstSchemeTest {
                        1.1, 0.2, 1.2, 0.2, 1.3, //
                        1.1, 0.2, 1.2, 0.2, 1.3, //
                        1.1, 0.2, 1.2, 0.2, 1.3, //
-               }),
-
-                       ColIndexFactory.create(new int[] {0, 2, 4})) != null);
+               }), ColIndexFactory.create(new int[] {0, 2, 4})) != null);
        }
 
-       @Test
-       public void testEncodeOtherColumnsInvalid() {
-               assertTrue(sh.encode(new MatrixBlock(4, 5, new double[] {//
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.4, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-               }), ColIndexFactory.create(new int[] {0, 2, 4})) == null);
-       }
+
 
        @Test(expected = IllegalArgumentException.class)
        public void testInvalidArgument_1() {
@@ -174,20 +131,6 @@ public class CLAConstSchemeTest {
                assertTrue(sh.encode(mb) != null);
        }
 
-       @Test
-       public void testSparse_invalid() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = empty.append(mb);
-
-               assertTrue(sh.encode(mb) == null);
-       }
 
        @Test
        public void testSparseValidCustom() {
@@ -219,43 +162,9 @@ public class CLAConstSchemeTest {
                assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
        }
 
-       @Test
-       public void testSparseValidCustom3Invalid() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.33, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               MatrixBlock comb = empty.append(mb).append(mb);
-
-               assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) == null);
-       }
-
-       @Test
-       public void testSparseEmptyRowInvalid() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               });
 
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = empty.append(mb);
-               MatrixBlock emptyRow = new MatrixBlock(1, 1006, 0.0);
-               mb = mb.append(emptyRow, false);
 
-               assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) == null);
-       }
 
-       @Test
-       public void testEmpty() {
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               assertTrue(sh.encode(empty) == null);
-       }
 
        @Test
        public void testGenericNonContinuosBlockValid() {
@@ -270,18 +179,7 @@ public class CLAConstSchemeTest {
                assertTrue(sh.encode(mb) != null);
        }
 
-       @Test
-       public void testGenericNonContinuosBlockInValid() {
-               MatrixBlock mb = new MatrixBlock(4, 6, //
-                       new DenseBlockFP64Mock(new int[] {4, 6}, new double[] 
{//
-                               0.2, 1.1, 0.4, 1.2, 0.3, 1.3, //
-                               0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                               0.0, 1.1, 0.2, 1.2, 0.1, 1.3, //
-                               0.2, 1.22, 0.4, 1.2, 0.1, 1.3, //
-                       }));
-               mb.recomputeNonZeros();
-               assertTrue(sh.encode(mb) == null);
-       }
+
 
        @Test(expected = NullPointerException.class)
        public void testNull() {
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java
index 5e4ca40486..0c40209f9c 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java
@@ -20,17 +20,23 @@
 package org.apache.sysds.test.component.compress.colgroup.scheme;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.EnumSet;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
 import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
 import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex;
+import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme;
 import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
 import org.apache.sysds.runtime.compress.estim.ComEstExact;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
@@ -40,6 +46,7 @@ import org.apache.sysds.test.TestUtils;
 import org.junit.Test;
 
 public class CLADDCSchemeTest {
+       protected final Log LOG = 
LogFactory.getLog(CLADDCSchemeTest.class.getName());
 
        final MatrixBlock src;
        final AColGroup g;
@@ -48,7 +55,7 @@ public class CLADDCSchemeTest {
        public CLADDCSchemeTest() {
                src = TestUtils.round(TestUtils.generateTestMatrixBlock(1023, 
3, 0, 3, 0.9, 7));
 
-               IColIndex colIndexes =  ColIndexFactory.create(3);
+               IColIndex colIndexes = ColIndexFactory.create(3);
                CompressionSettings cs = new 
CompressionSettingsBuilder().setSamplingRatio(1.0)
                        
.setValidCompressions(EnumSet.of(CompressionType.DDC)).create();
                final CompressedSizeInfoColGroup cgi = new ComEstExact(src, 
cs).getColGroupInfo(colIndexes);
@@ -66,7 +73,7 @@ public class CLADDCSchemeTest {
 
        @Test(expected = IllegalArgumentException.class)
        public void testInvalidColumnApply_2() {
-               sh.encode(null,  ColIndexFactory.create(new int[] {1, 2, 5, 
5}));
+               sh.encode(null, ColIndexFactory.create(new int[] {1, 2, 5, 5}));
        }
 
        @Test(expected = NullPointerException.class)
@@ -87,46 +94,53 @@ public class CLADDCSchemeTest {
        @Test
        public void testEncodeSparseDifferentColumns() {
                
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.01, 7)),
-               ColIndexFactory.create(new int[] {13, 16, 30})) != null);
+                       ColIndexFactory.create(new int[] {13, 16, 30})) != 
null);
        }
 
        @Test
        public void testEncodeSparseDifferentColumns2() {
                
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.01, 7)),
-               ColIndexFactory.create(new int[] {15, 16, 99})) != null);
+                       ColIndexFactory.create(new int[] {15, 16, 99})) != 
null);
        }
 
        @Test
        public void testEncodeSparseDifferentColumns3() {
                
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.01, 7)),
-               ColIndexFactory.create(new int[] {15, 86, 99})) != null);
+                       ColIndexFactory.create(new int[] {15, 86, 99})) != 
null);
        }
 
        @Test
        public void testEncodeDenseDifferentColumns() {
                
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.86, 7)),
-               ColIndexFactory.create(new int[] {13, 16, 30})) != null);
+                       ColIndexFactory.create(new int[] {13, 16, 30})) != 
null);
        }
 
        @Test
        public void testEncodeDenseDifferentColumns2() {
                
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.86, 7)),
-               ColIndexFactory.create(new int[] {15, 16, 99})) != null);
+                       ColIndexFactory.create(new int[] {15, 16, 99})) != 
null);
        }
 
        @Test
        public void testEncodeDenseDifferentColumns3() {
                
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.86, 7)),
-               ColIndexFactory.create(new int[] {15, 86, 99})) != null);
+                       ColIndexFactory.create(new int[] {15, 86, 99})) != 
null);
        }
 
        @Test
-       public void testEncodeInvalid() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 3, 3, 
6, 0.9, 7))) == null);
-       }
-
-       @Test
-       public void testEncodeInvalid_2() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(200, 3, 
3, 6, 0.9, 7))) == null);
+       public void testEncodeFromColumns() {
+               try {
+
+                       DDCScheme s = DDCScheme.create(new RangeIndex(2));
+                       MatrixBlock m = 
TestUtils.round(TestUtils.generateTestMatrixBlock(50, 3, 0, 2, 0.9, 7));
+                       s.update(m);
+                       AColGroup g = s.encode(m);
+                       assertTrue(g instanceof ColGroupDDC);
+               }
+
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java
index f5ac4f8450..b27dd42c99 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java
@@ -36,7 +36,7 @@ public class CLAEmptySchemeTest {
 
        public CLAEmptySchemeTest() {
                g = new ColGroupEmpty(//
-               ColIndexFactory.create(new int[] {1, 3, 5}) // Columns
+                       ColIndexFactory.create(new int[] {1, 3, 5}) // Columns
                );
                sh = g.getCompressionScheme();
        }
@@ -46,22 +46,10 @@ public class CLAEmptySchemeTest {
                assertTrue(sh != null);
        }
 
-       @Test
+       @Test(expected = IllegalArgumentException.class)
        public void testToSmallMatrix() {
-               assertTrue(sh.encode(new MatrixBlock(1, 3, new double[] {//
-                       1.1, 1.2, 1.3})) == null);
-       }
-
-       @Test
-       public void testWrongValuesSingleRow() {
-               assertTrue(sh.encode(new MatrixBlock(1, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.2})) == null);
-       }
-
-       @Test
-       public void testWrongValuesSingleRowV2() {
-               assertTrue(sh.encode(new MatrixBlock(1, 6, new double[] {//
-                       0.0, 1.0, 0.2, 1.2, 0.2, 1.3})) == null);
+               sh.encode(new MatrixBlock(1, 3, new double[] {//
+                       1.1, 1.2, 1.3}));
        }
 
        @Test
@@ -86,16 +74,6 @@ public class CLAEmptySchemeTest {
                })) != null);
        }
 
-       @Test
-       public void testInvalidEncodeMultiRowsValue() {
-               assertTrue(sh.encode(new MatrixBlock(4, 8, new double[] {//
-                       0.0, 0.0, 0.2, 0.0, 1.2, 0.0, 0.2, 1.3, //
-                       0.0, 0.0, 0.2, 0.0, 1.2, 0.0, 0.2, 1.3, //
-                       0.0, 0.0, 0.2, 0.0, 1.2, 0.0, 0.2, 1.3, //
-                       0.0, 0.0, 0.2, 0.0, 1.2, 0.0, 0.2, 1.3, //
-               })) != null);
-       }
-
        @Test
        public void testValidEncodeMultiRowDifferentValuesOtherColumns() {
                assertTrue(sh.encode(new MatrixBlock(4, 12, new double[] {//
@@ -106,37 +84,6 @@ public class CLAEmptySchemeTest {
                })) != null);
        }
 
-       @Test
-       public void testInvalidEncodeValueMultiRowMultiError() {
-               assertTrue(sh.encode(new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.4, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               })) == null);
-       }
-
-       @Test
-       public void testInvalidEncodeMultiRow() {
-               assertTrue(sh.encode(new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.3, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.4, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               })) == null);
-       }
-
-       @Test
-       public void testEncodeOtherColumns() {
-               assertTrue(sh.encode(new MatrixBlock(4, 5, new double[] {//
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-               }), ColIndexFactory.create( new int[] {0, 2, 4})// other columns
-               ) == null);
-       }
-
        @Test
        public void testEncodeOtherColumnsValid() {
                assertTrue(sh.encode(new MatrixBlock(4, 8, new double[] {//
@@ -144,29 +91,18 @@ public class CLAEmptySchemeTest {
                        0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
                        0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
                        0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-               }),  ColIndexFactory.create(new int[] {0, 2, 4})// other columns
+               }), ColIndexFactory.create(new int[] {0, 2, 4})// other columns
                ) != null);
        }
 
-       @Test
-       public void testEncodeOtherColumnsInvalid() {
-               assertTrue(sh.encode(new MatrixBlock(4, 5, new double[] {//
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.4, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-               }),  ColIndexFactory.create(new int[] {0, 2, 4})// other columns
-               ) == null);
-       }
-
        @Test(expected = IllegalArgumentException.class)
        public void testInvalidArgument_1() {
-               sh.encode(null, ColIndexFactory.create( new int[] {0, 2, 4, 
5}));
+               sh.encode(null, ColIndexFactory.create(new int[] {0, 2, 4, 5}));
        }
 
        @Test(expected = IllegalArgumentException.class)
        public void testInvalidArgument_2() {
-               sh.encode(null, ColIndexFactory.create( new int[] {0, 2}));
+               sh.encode(null, ColIndexFactory.create(new int[] {0, 2}));
        }
 
        @Test
@@ -196,22 +132,7 @@ public class CLAEmptySchemeTest {
                MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
                mb = mb.append(empty);
 
-               assertTrue(sh.encode(mb,  ColIndexFactory.create(new int[] 
{100, 102, 999})) != null);
-       }
-
-       @Test
-       public void testSpars_InsideInvalid() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = mb.append(empty);
-
-               assertTrue(sh.encode(mb,  ColIndexFactory.create(new int[] {1, 
4, 5})) == null);
+               assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] {100, 
102, 999})) != null);
        }
 
        @Test
@@ -241,7 +162,7 @@ public class CLAEmptySchemeTest {
                MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
                mb = empty.append(mb);
 
-               assertTrue(sh.encode(mb,  ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
+               assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
        }
 
        @Test
@@ -256,7 +177,7 @@ public class CLAEmptySchemeTest {
                MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
                MatrixBlock comb = empty.append(mb).append(mb);
 
-               assertTrue(sh.encode(comb,  ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
+               assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
        }
 
        @Test
@@ -271,7 +192,7 @@ public class CLAEmptySchemeTest {
                MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
                MatrixBlock comb = empty.append(mb).append(mb);
 
-               assertTrue(sh.encode(comb,  ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
+               assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
        }
 
        @Test
@@ -288,7 +209,7 @@ public class CLAEmptySchemeTest {
                MatrixBlock emptyRow = new MatrixBlock(1, 1006, 0.0);
                mb = mb.append(emptyRow, false);
 
-               assertTrue(sh.encode(mb,  ColIndexFactory.create(new int[] {44, 
45, 999})) != null);
+               assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] {44, 
45, 999})) != null);
        }
 
        @Test
@@ -300,7 +221,7 @@ public class CLAEmptySchemeTest {
        @Test
        public void testEmptyOtherColumns() {
                MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               assertTrue(sh.encode(empty,  ColIndexFactory.create(new int[] 
{33, 34, 99})) != null);
+               assertTrue(sh.encode(empty, ColIndexFactory.create(new int[] 
{33, 34, 99})) != null);
        }
 
        @Test
@@ -316,19 +237,6 @@ public class CLAEmptySchemeTest {
                assertTrue(sh.encode(mb) != null);
        }
 
-       @Test
-       public void testGenericNonContinuosBlockInValid() {
-               MatrixBlock mb = new MatrixBlock(4, 6, //
-                       new DenseBlockFP64Mock(new int[] {4, 6}, new double[] 
{//
-                               0.2, 1.1, 0.4, 1.2, 0.3, 1.3, //
-                               0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                               0.0, 1.1, 0.2, 1.2, 0.1, 1.3, //
-                               0.2, 1.22, 0.4, 1.2, 0.1, 1.3, //
-                       }));
-               mb.recomputeNonZeros();
-               assertTrue(sh.encode(mb) == null);
-       }
-
        @Test(expected = NullPointerException.class)
        public void testNull() {
                sh.encode(null, null);
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java 
b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java
index 003ef85d08..296bd84e07 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java
@@ -141,7 +141,7 @@ public class IOTest {
        }
 
        protected static void writeAndRead(MatrixBlock mb, int blen) throws 
Exception {
-               try{
+               try {
 
                        String filename = getName();
                        WriterCompressed.writeCompressedMatrixToHDFS(mb, 
filename, blen);
@@ -150,7 +150,7 @@ public class IOTest {
                        MatrixBlock mbr = IOCompressionTestUtils.read(filename);
                        IOCompressionTestUtils.verifyEquivalence(mb, mbr);
                }
-               catch(Exception e){
+               catch(Exception e) {
                        e.printStackTrace();
                        throw e;
                }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/plan/CustomEncodePlanTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/plan/CustomEncodePlanTest.java
new file mode 100644
index 0000000000..5a224c7677
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/plan/CustomEncodePlanTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.plan;
+
+import static org.junit.Assert.fail;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.compress.plan.CompressionPlanFactory;
+import org.apache.sysds.runtime.compress.plan.IPlanEncode;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+public class CustomEncodePlanTest {
+       protected static final Log LOG = 
LogFactory.getLog(CustomEncodePlanTest.class.getName());
+
+       @Test
+       public void testEncodeDDCConst() {
+               testDDCSingle(new MatrixBlock(10, 2, 3.0));
+       }
+
+       @Test
+       public void testEncodeDDCNonZero1() {
+               testDDCSingle(TestUtils.generateTestMatrixBlock(10, 3, 1, 1, 
0.5, 235));
+       }
+
+       @Test
+       public void testEncodeDDCNonZero2() {
+               testDDCSingle(TestUtils.generateTestMatrixBlock(10, 3, 1, 1, 
0.9, 235));
+       }
+
+       @Test
+       public void testEncodeDDCConstTwo() {
+               testDDCTwo(new MatrixBlock(10, 2, 3.0));
+       }
+
+       @Test
+       public void testEncodeDDCNonZeroTwo1() {
+               testDDCTwo(TestUtils.generateTestMatrixBlock(10, 3, 1, 1, 0.5, 
235));
+       }
+
+       @Test
+       public void testEncodeDDCNonZeroTwo2() {
+               testDDCTwo(TestUtils.generateTestMatrixBlock(10, 3, 1, 1, 0.9, 
235));
+       }
+
+       @Test
+       public void testEncodeDDCNonZeroTwo3() {
+               testDDCTwo(TestUtils.generateTestMatrixBlock(10, 3, 1, 1, 0.1, 
235));
+       }
+
+       @Test
+       public void testEncodeDDCNonZeroTwo4() {
+               testDDCTwo(TestUtils.generateTestMatrixBlock(10, 3, 1, 1, 1.0, 
235));
+       }
+
+       @Test
+       public void testEncodeDDCNonZeroTwo5() {
+               testDDCTwo(TestUtils.generateTestMatrixBlock(10, 3, 1, 1, 0.0, 
235));
+       }
+
+       @Test
+       public void testEncodeDDCConstN() {
+               testDDCn(new MatrixBlock(10, 2, 3.0), 3);
+       }
+
+       @Test
+       public void testEncodeDDCNonZeroN1() {
+               testDDCn(TestUtils.generateTestMatrixBlock(10, 5, 1, 1, 0.5, 
235), 3);
+       }
+
+       @Test
+       public void testEncodeDDCNonZeroN2() {
+               testDDCn(TestUtils.generateTestMatrixBlock(10, 4, 1, 1, 0.9, 
235), 3);
+       }
+
+       private void testDDCSingle(MatrixBlock mb) {
+               try {
+
+                       IPlanEncode plan = 
CompressionPlanFactory.singleCols(mb.getNumColumns(), CompressionType.DDC, 1);
+                       testPlan(mb, plan);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       protected void testDDCTwo(MatrixBlock mb) {
+               try {
+
+                       IPlanEncode plan = 
CompressionPlanFactory.twoCols(mb.getNumColumns(), CompressionType.DDC, 1);
+                       testPlan(mb, plan);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       protected void testDDCn(MatrixBlock mb, int n) {
+               try {
+                       IPlanEncode plan = 
CompressionPlanFactory.nCols(mb.getNumColumns(), n, CompressionType.DDC, 1);
+                       testPlan(mb, plan);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       protected void testPlan(MatrixBlock mb, IPlanEncode plan) {
+               try {
+
+                       plan.expandPlan(mb);
+                       MatrixBlock cmb = plan.encode(mb);
+                       TestUtils.compareMatrices(mb, cmb, 0);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/plan/EncodePerformanceTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/plan/EncodePerformanceTest.java
new file mode 100644
index 0000000000..3ff57a3076
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/plan/EncodePerformanceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.plan;
+
+import static org.junit.Assert.fail;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.compress.plan.CompressionPlanFactory;
+import org.apache.sysds.runtime.compress.plan.IPlanEncode;
+import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.test.TestUtils;
+
+public class EncodePerformanceTest {
+       protected static final Log LOG = 
LogFactory.getLog(EncodePerformanceTest.class.getName());
+
+       public static void main(String[] args) {
+               // manually run test
+               MatrixBlock mb = TestUtils.generateTestMatrixBlock(1000, 300, 
1, 1, 0.5, 235);
+               IPlanEncode plan = 
CompressionPlanFactory.nCols(mb.getNumColumns(), 10, CompressionType.DDC, 16);
+               testExpand(mb, plan);
+               testEncode(mb, plan);
+       }
+
+       private static void testExpand(MatrixBlock mb, IPlanEncode plan) {
+               try {
+
+                       for(int j = 0; j < 5; j++) {
+                               Timing time = new Timing(true);
+                               plan.expandPlan(mb);
+                               LOG.error(time.stop());
+                       }
+                       for(int i = 0; i < 10000; i++) {
+                               Timing time = new Timing(true);
+                               for(int j = 0; j < 100; j++) {
+                                       plan.expandPlan(mb);
+                               }
+                               LOG.error(time.stop());
+                       }
+                       MatrixBlock cmb = plan.encode(mb);
+                       TestUtils.compareMatrices(mb, cmb, 0);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       private static void testEncode(MatrixBlock mb, IPlanEncode plan) {
+               try {
+                       MatrixBlock cmb = null;
+                       plan.expandPlan(mb);
+                       for(int i = 0; i < 10000; i++) {
+                               Timing time = new Timing(true);
+                               for(int j = 0; j < 100; j++) {
+                                       cmb = plan.encode(mb);
+                               }
+                               LOG.error(time.stop());
+                       }
+                       TestUtils.compareMatrices(mb, cmb, 0);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

Reply via email to