This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 67bb8034611a766ca93c0314ee735a3252e79290
Author: baunsgaard <[email protected]>
AuthorDate: Mon Aug 30 16:28:31 2021 +0200

    [SYSTEMDS-2610] CLA updates
    
    - Compressed matrix factory improvements
    - Add decompression if the data is serialized and larger in compressed 
format
    - Decompress on write to HDFS
    - Abort compression after cocode if the compression sizes are bad
    - Make c bind decompressing in workload tree (worst case)
    - Add a minimum compression ratio argument to the CompressionSettings
    - Reduce the sampling size in c bind compression and set high minimum 
compression ratio
    - Fix order of operations in compressed append
    - Add compressed output size to unary hops
    - More utilization of the cached decompressed matrix if it fits in
    memory by looking for soft reference of uncompressed in certain cases
---
 src/main/java/org/apache/sysds/hops/UnaryOp.java   |  12 +-
 .../ipa/IPAPassCompressionWorkloadAnalysis.java    |   1 +
 .../hops/rewrite/RewriteCompressedReblock.java     |   2 +-
 .../runtime/compress/CompressedMatrixBlock.java    |  29 ++-
 .../compress/CompressedMatrixBlockFactory.java     |  98 +++++++-
 .../runtime/compress/CompressionSettings.java      |   8 +-
 .../compress/CompressionSettingsBuilder.java       |   8 +-
 .../runtime/compress/cocode/AColumnCoCoder.java    |  22 +-
 .../runtime/compress/cocode/CoCodeBinPacking.java  |   4 +-
 .../runtime/compress/cocode/CoCodeGreedy.java      |   2 +-
 .../runtime/compress/cocode/CoCodePriorityQue.java |   4 +-
 .../compress/colgroup/ColGroupUncompressed.java    |  19 +-
 .../compress/colgroup/mapping/MapToFactory.java    |  12 +-
 .../compress/cost/CostEstimatorBuilder.java        |   5 +
 .../compress/cost/CostEstimatorFactory.java        |   2 +-
 .../compress/estim/CompressedSizeEstimator.java    |  43 +++-
 .../estim/CompressedSizeEstimatorExact.java        |   5 +-
 .../estim/CompressedSizeEstimatorFactory.java      |  30 ++-
 .../estim/CompressedSizeEstimatorSample.java       |  11 +-
 .../compress/estim/CompressedSizeInfoColGroup.java |   2 +-
 .../sysds/runtime/compress/lib/CLALibAppend.java   |  32 +--
 .../runtime/compress/lib/CLALibBinaryCellOp.java   |  42 ++--
 .../sysds/runtime/compress/lib/CLALibCompAgg.java  |  51 ++--
 .../runtime/compress/lib/CLALibRelationalOp.java   | 267 ---------------------
 .../sysds/runtime/compress/lib/CLALibScalar.java   |   7 -
 .../compress/workload/WorkloadAnalyzer.java        |   5 +-
 .../context/SparkExecutionContext.java             |   8 +-
 .../spark/BinUaggChainSPInstruction.java           |   6 +
 .../component/compress/CompressedMatrixTest.java   |  33 +++
 .../component/compress/CompressedTestBase.java     |  27 ---
 .../component/compress/workload/WorkloadTest.java  |  18 +-
 .../compress/workload/WorkloadAlgorithmTest.java   |   2 +-
 32 files changed, 379 insertions(+), 438 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java 
b/src/main/java/org/apache/sysds/hops/UnaryOp.java
index b7df277..38199b2 100644
--- a/src/main/java/org/apache/sysds/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java
@@ -545,7 +545,7 @@ public class UnaryOp extends MultiThreadedHop
                        setDim2(1);
                }
                else if(_op == OpOp1.TYPEOF || _op == OpOp1.DETECTSCHEMA || _op 
== OpOp1.COLNAMES) {
-                       //TODO theses three builtins should rather be moved to 
unary aggregates
+                       //TODO these three builtins should rather be moved to 
unary aggregates
                        setDim1(1);
                        setDim2(input.getDim2());
                }
@@ -564,6 +564,16 @@ public class UnaryOp extends MultiThreadedHop
                        {
                                setNnz( input.getNnz() );
                        }
+
+                       // if the input is compressed then set the output to be 
compressed as well.
+                       if(input._compressedOutput && ! 
(_op==OpOp1.DECOMPRESS)){
+                               setCompressedOutput(true);
+                               // Setting the compressed output to be 2 x 
larger.
+                               // Just in case we change the compressed 
structure slightly.
+                               // this value is overwritten with correct size 
once the hop is executed
+                               // TODO handle overlapping state, since some 
operations would not lead to compressed output.
+                               setCompressedSize(input.compressedSize() * 2);
+                       }
                }
        }
        
diff --git 
a/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
 
b/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
index 324b272..b23397f 100644
--- 
a/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
+++ 
b/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
@@ -58,6 +58,7 @@ public class IPAPassCompressionWorkloadAnalysis extends 
IPAPass {
                        WTreeRoot tree = e.getValue();
                        CostEstimatorBuilder b = new CostEstimatorBuilder(tree);
                        // filter out compression plans that is known bad
+                       
                        if(b.shouldTryToCompress()){
                                tree.getRoot().setRequiresCompression(tree);
                                for(Hop h : tree.getDecompressList())
diff --git 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
index 6b51a51..2f0f1f6c 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
@@ -128,7 +128,7 @@ public class RewriteCompressedReblock extends 
StatementBlockRewriteRule {
 
        public static boolean satisfiesSizeConstraintsForCompression(Hop hop) {
                if(hop.getDim2() >= 1) {
-                       return (hop.getDim1() >= 1000 && hop.getDim2() < 100) 
|| hop.getDim1() / hop.getDim2() >= 75;
+                       return (hop.getDim1() >= 1000 && hop.getDim2() < 100) 
|| hop.getDim1() / hop.getDim2() >= 75 || (hop.getSparsity() < 0.0001 && 
hop.getDim1() > 1000);
                }
                return false;
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index d374d3c..5dcc406 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -302,7 +302,10 @@ public class CompressedMatrixBlock extends MatrixBlock {
                return ret;
        }
 
-       private MatrixBlock getCachedDecompressed() {
+       /**
+        * Get the cached decompressed matrix (if it exists otherwise null)
+        */
+       public MatrixBlock getCachedDecompressed() {
                if(decompressedVersion != null) {
                        final MatrixBlock mb = decompressedVersion.get();
                        if(mb != null) {
@@ -453,6 +456,16 @@ public class CompressedMatrixBlock extends MatrixBlock {
 
        @Override
        public void write(DataOutput out) throws IOException {
+               if(getExactSizeOnDisk() > MatrixBlock.estimateSizeOnDisk(rlen, 
clen, nonZeros)) {
+                       // decompress and make a uncompressed column group. 
this is then used for the serialization, since it is
+                       // smaller.
+                       // throw new NotImplementedException("Decompressing 
serialization is not implemented");
+
+                       MatrixBlock uncompressed = 
getUncompressed("Decompressing serialization for smaller serialization");
+                       ColGroupUncompressed cg = new 
ColGroupUncompressed(uncompressed);
+                       allocateColGroup(cg);
+                       nonZeros = cg.getNumberNonZeros();
+               }
                // serialize compressed matrix block
                out.writeInt(rlen);
                out.writeInt(clen);
@@ -492,11 +505,15 @@ public class CompressedMatrixBlock extends MatrixBlock {
 
        @Override
        public MatrixBlock binaryOperations(BinaryOperator op, MatrixValue 
thatValue, MatrixValue result) {
-               return CLALibBinaryCellOp.binaryOperations(op, this, thatValue, 
result);
+               MatrixBlock that = thatValue == null ? null : (MatrixBlock) 
thatValue;
+               MatrixBlock ret = result == null ? null : (MatrixBlock) result;
+               return CLALibBinaryCellOp.binaryOperations(op, this, that, ret);
        }
 
        public MatrixBlock binaryOperationsLeft(BinaryOperator op, MatrixValue 
thatValue, MatrixValue result) {
-               return CLALibBinaryCellOp.binaryOperationsLeft(op, this, 
thatValue, result);
+               MatrixBlock that = thatValue == null ? null : (MatrixBlock) 
thatValue;
+               MatrixBlock ret = result == null ? null : (MatrixBlock) result;
+               return CLALibBinaryCellOp.binaryOperationsLeft(op, this, that, 
ret);
        }
 
        @Override
@@ -686,8 +703,8 @@ public class CompressedMatrixBlock extends MatrixBlock {
                                .aggregateUnaryOperations(op, result, blen, 
indexesIn, inCP);
 
                }
-
-               return CLALibCompAgg.aggregateUnary(this, result, op, blen, 
indexesIn, inCP);
+               MatrixBlock ret = (result == null) ? null : (MatrixBlock) 
result;
+               return CLALibCompAgg.aggregateUnary(this, ret, op, blen, 
indexesIn, inCP);
        }
 
        @Override
@@ -1080,7 +1097,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
                boolean m2C = m2 instanceof CompressedMatrixBlock;
                boolean m3C = m3 instanceof CompressedMatrixBlock;
                printDecompressWarning("aggregateTernaryOperations " + 
op.aggOp.getClass().getSimpleName() + " "
-                       + op.indexFn.getClass().getSimpleName() + "  " + 
op.aggOp.increOp.fn.getClass().getSimpleName() + " "
+                       + op.indexFn.getClass().getSimpleName() + " " + 
op.aggOp.increOp.fn.getClass().getSimpleName() + " "
                        + op.binaryFn.getClass().getSimpleName() + " m1,m2,m3 " 
+ m1C + " " + m2C + " " + m3C);
                MatrixBlock left = getUncompressed();
                MatrixBlock right1 = getUncompressed(m2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index 20beaf5..5f45d56 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -32,18 +32,21 @@ import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupValue;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
 import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory;
 import org.apache.sysds.runtime.compress.cost.ICostEstimate;
-import org.apache.sysds.runtime.compress.cost.MemoryCostEstimator;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
 import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap;
 import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
 import org.apache.sysds.runtime.compress.workload.WTreeRoot;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.utils.DMLCompressionStatistics;
@@ -65,7 +68,7 @@ public class CompressedMatrixBlockFactory {
        private final CompressionSettings compSettings;
        /** The main cost estimator used for the compression */
        private final ICostEstimate costEstimator;
-       
+
        /** Time stamp of last phase */
        private double lastPhase;
        /** Pointer to the original matrix Block that is about to be 
compressed. */
@@ -153,6 +156,20 @@ public class CompressedMatrixBlockFactory {
        }
 
        /**
+        * Generate a CompressedMatrixBlock Object that contains a single 
uncompressed matrix block column group.
+        * 
+        * @param mb The matrix block to be contained in the uncompressed 
matrix block column,
+        * @return a CompressedMatrixBlock
+        */
+       public static CompressedMatrixBlock 
genUncompressedCompressedMatrixBlock(MatrixBlock mb) {
+               CompressedMatrixBlock ret = new 
CompressedMatrixBlock(mb.getNumRows(), mb.getNumColumns());
+               AColGroup cg = new ColGroupUncompressed(mb);
+               ret.allocateColGroup(cg);
+               ret.setNonZeros(mb.getNonZeros());
+               return ret;
+       }
+
+       /**
         * Method for constructing a compressed matrix out of an constant input.
         * 
         * Since the input is a constant value it is trivially compressable, 
therefore we skip the entire compression
@@ -191,9 +208,13 @@ public class CompressedMatrixBlockFactory {
 
                res = new CompressedMatrixBlock(mb); // copy metadata and 
allocate soft reference
 
-               classifyPhase();
-               if(coCodeColGroups == null)
-                       return abortCompression();
+               looksLikeOneHot();
+
+               if(coCodeColGroups == null) {
+                       classifyPhase();
+                       if(coCodeColGroups == null)
+                               return abortCompression();
+               }
 
                transposePhase();
                compressPhase();
@@ -217,7 +238,13 @@ public class CompressedMatrixBlockFactory {
                _stats.estimatedSizeCols = sizeInfos.memoryEstimate();
                logPhase();
 
-               if(!(costEstimator instanceof MemoryCostEstimator) || 
_stats.estimatedSizeCols < _stats.originalSize)
+               final boolean isValidForComputeBasedCompression = 
isComputeBasedCompression() &&
+                       (compSettings.minimumCompressionRatio != 1.0) ? 
_stats.estimatedSizeCols *
+                               compSettings.minimumCompressionRatio < 
_stats.originalSize : true;
+               final boolean isValidForMemoryBasedCompression = 
_stats.estimatedSizeCols *
+                       compSettings.minimumCompressionRatio < 
_stats.originalSize;
+
+               if(isValidForComputeBasedCompression || 
isValidForMemoryBasedCompression)
                        coCodePhase(sizeEstimator, sizeInfos, costEstimator);
                else {
                        LOG.info("Estimated Size of singleColGroups: " + 
_stats.estimatedSizeCols);
@@ -225,13 +252,72 @@ public class CompressedMatrixBlockFactory {
                }
        }
 
+       private boolean isComputeBasedCompression() {
+               return costEstimator instanceof ComputationCostEstimator;
+       }
+
        private void coCodePhase(CompressedSizeEstimator sizeEstimator, 
CompressedSizeInfo sizeInfos,
                ICostEstimate costEstimator) {
                coCodeColGroups = 
CoCoderFactory.findCoCodesByPartitioning(sizeEstimator, sizeInfos, k, 
costEstimator,
                        compSettings);
 
                _stats.estimatedSizeCoCoded = coCodeColGroups.memoryEstimate();
+
                logPhase();
+
+               // if cocode is estimated larger than uncompressed abort 
compression.
+               if(isComputeBasedCompression() &&
+                       _stats.estimatedSizeCoCoded * 
compSettings.minimumCompressionRatio > _stats.originalSize) {
+
+                       coCodeColGroups = null;
+                       LOG.info("Aborting compression because the cocoded size 
: " + _stats.estimatedSizeCoCoded);
+                       LOG.info("Vs original size                              
: " + _stats.originalSize);
+               }
+
+       }
+
+       private void looksLikeOneHot() {
+               final int numColumns = mb.getNumColumns();
+               final int numRows = mb.getNumRows();
+               final long nnz = mb.getNonZeros();
+               final int colGroupSize = 100;
+               if(nnz == numRows) {
+                       boolean onlyOneValues = true;
+                       LOG.debug("Looks like one hot encoded.");
+                       if(mb.isInSparseFormat()) {
+                               final SparseBlock sb = mb.getSparseBlock();
+                               for(double v : sb.get(0).values()) {
+                                       onlyOneValues = v == 1.0;
+                                       if(!onlyOneValues) {
+                                               break;
+                                       }
+                               }
+                       }
+                       else {
+                               final double[] vals = 
mb.getDenseBlock().values(0);
+                               for(int i = 0; i < Math.min(vals.length, 1000); 
i++) {
+                                       double v = vals[i];
+                                       onlyOneValues = v == 1.0 || v == 0.0;
+                                       if(!onlyOneValues) {
+                                               break;
+                                       }
+                               }
+                       }
+                       if(onlyOneValues) {
+                               List<CompressedSizeInfoColGroup> ng = new 
ArrayList<>(numColumns / colGroupSize + 1);
+                               for(int i = 0; i < numColumns; i += 
colGroupSize) {
+                                       int[] columnIds = new 
int[Math.min(colGroupSize, numColumns - i)];
+                                       for(int j = 0; j < columnIds.length; 
j++)
+                                               columnIds[j] = i + j;
+                                       ng.add(new 
CompressedSizeInfoColGroup(columnIds, Math.min(numColumns, colGroupSize), 
numRows));
+                               }
+                               coCodeColGroups = new CompressedSizeInfo(ng);
+
+                               LOG.debug("Concluded that it probably is one 
hot encoded skipping analysis");
+                               // skipping two phases
+                               phase += 2;
+                       }
+               }
        }
 
        private void transposePhase() {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
index 6dfcdae..ea04e8e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
@@ -103,10 +103,15 @@ public class CompressionSettings {
         */
        public boolean transposed = false;
 
+       /**
+        * The minimum compression ratio to achieve.
+        */
+       public final double minimumCompressionRatio;
+
        protected CompressionSettings(double samplingRatio, boolean 
allowSharedDictionary, String transposeInput,
                 int seed, boolean lossy, EnumSet<CompressionType> 
validCompressions,
                boolean sortValuesByLength, PartitionerType columnPartitioner, 
int maxColGroupCoCode, double coCodePercentage,
-               int minimumSampleSize, EstimationType estimationType, CostType 
costComputationType) {
+               int minimumSampleSize, EstimationType estimationType, CostType 
costComputationType, double minimumCompressionRatio) {
                this.samplingRatio = samplingRatio;
                this.allowSharedDictionary = allowSharedDictionary;
                this.transposeInput = transposeInput;
@@ -120,6 +125,7 @@ public class CompressionSettings {
                this.minimumSampleSize = minimumSampleSize;
                this.estimationType = estimationType;
                this.costComputationType = costComputationType;
+               this.minimumCompressionRatio = minimumCompressionRatio;
                if(LOG.isDebugEnabled())
                        LOG.debug(this);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
 
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index a56285b..2864118 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -45,6 +45,7 @@ public class CompressionSettingsBuilder {
        private EstimationType estimationType = EstimationType.HassAndStokes;
        private PartitionerType columnPartitioner;
        private CostType costType;
+       private double minimumCompressionRatio = 1.0;
 
        public CompressionSettingsBuilder() {
 
@@ -267,6 +268,11 @@ public class CompressionSettingsBuilder {
                return this;
        }
 
+       public CompressionSettingsBuilder setMinimumCompressionRatio(double 
ratio){
+               this.minimumCompressionRatio = ratio;
+               return this;
+       }
+
        /**
         * Create the CompressionSettings object to use in the compression.
         * 
@@ -275,6 +281,6 @@ public class CompressionSettingsBuilder {
        public CompressionSettings create() {
                return new CompressionSettings(samplingRatio, 
allowSharedDictionary, transposeInput, seed, lossy,
                        validCompressions, sortValuesByLength, 
columnPartitioner, maxColGroupCoCode, coCodePercentage,
-                       minimumSampleSize, estimationType, costType);
+                       minimumSampleSize, estimationType, 
costType,minimumCompressionRatio);
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
index 41d986c..3bdcf5d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
@@ -26,7 +26,6 @@ import org.apache.sysds.runtime.compress.cost.ICostEstimate;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
-import org.apache.sysds.runtime.compress.utils.Util;
 
 public abstract class AColumnCoCoder {
 
@@ -52,26 +51,17 @@ public abstract class AColumnCoCoder {
         */
        protected abstract CompressedSizeInfo coCodeColumns(CompressedSizeInfo 
colInfos, int k);
 
-       protected CompressedSizeInfoColGroup join(CompressedSizeInfoColGroup 
lhs, CompressedSizeInfoColGroup rhs,
-               boolean analyze) {
-               return analyze ? joinWithAnalysis(lhs, rhs) : 
joinWithoutAnalysis(lhs, rhs);
+       protected CompressedSizeInfoColGroup join(int[] joined, 
CompressedSizeInfoColGroup lhs,
+               CompressedSizeInfoColGroup rhs, boolean analyze) {
+               return analyze ? _sest.estimateJoinCompressedSize(joined, lhs, 
rhs) : joinWithoutAnalysis(joined, lhs, rhs);
        }
 
-       protected CompressedSizeInfoColGroup 
joinWithAnalysis(CompressedSizeInfoColGroup lhs,
+       protected CompressedSizeInfoColGroup joinWithoutAnalysis(int[] joined, 
CompressedSizeInfoColGroup lhs,
                CompressedSizeInfoColGroup rhs) {
-               return _sest.estimateJoinCompressedSize(lhs, rhs);
-       }
-
-       protected CompressedSizeInfoColGroup 
joinWithoutAnalysis(CompressedSizeInfoColGroup lhs,
-               CompressedSizeInfoColGroup rhs) {
-               int[] joined = Util.join(lhs.getColumns(), rhs.getColumns());
                final int lhsV = lhs.getNumVals();
                final int rhsV = rhs.getNumVals();
-               final int numVals = lhsV * rhsV;
-               if(numVals < 0 || numVals > _sest.getNumRows())
-                       return null;
-               else
-                       return new CompressedSizeInfoColGroup(joined, numVals, 
_sest.getNumRows());
+               final int joinedMaxDistinct = (int) Math.min((long) lhsV * 
(long) rhsV, (long) _sest.getNumRows());
+               return new CompressedSizeInfoColGroup(joined, 
joinedMaxDistinct, _sest.getNumRows());
        }
 
        protected CompressedSizeInfoColGroup analyze(CompressedSizeInfoColGroup 
g) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
index 1887bdc..1731ef2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
@@ -141,7 +141,7 @@ public class CoCodeBinPacking extends AColumnCoCoder {
                        for(int j = 0; j < bins.size(); j++) {
                                double newBinWeight = binWeights[j] - 
c.getCardinalityRatio();
                                if(newBinWeight >= 0 && 
bins.get(j).getColumns().length < MAX_COL_PER_GROUP - 1) {
-                                       bins.set(j, 
joinWithoutAnalysis(bins.get(j), c));
+                                       bins.set(j, 
joinWithoutAnalysis(Util.join(bins.get(j).getColumns(), 
c.getColumns()),bins.get(j), c));
                                        binWeights[j] = newBinWeight;
                                        assigned = true;
                                        break;
@@ -291,7 +291,7 @@ public class CoCodeBinPacking extends AColumnCoCoder {
                                        g = 
CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions);
                                else {
                                        st3++;
-                                       g = 
_sest.estimateJoinCompressedSize(left, right);
+                                       g = _sest.estimateJoinCompressedSize(c, 
left, right);
                                }
 
                                if(leftConst || rightConst)
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
index 51bfa46..3091ba0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
@@ -163,7 +163,7 @@ public class CoCodeGreedy extends AColumnCoCoder {
                                        g = 
CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions);
                                else {
                                        st3++;
-                                       g = 
_sest.estimateJoinCompressedSize(left, right);
+                                       g = _sest.estimateJoinCompressedSize(c, 
left, right);
                                }
 
                                if(leftConst || rightConst)
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
index 2e0e7fb..27b678c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
@@ -68,7 +68,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
                        while(que.peek() != null) {
 
                                CompressedSizeInfoColGroup r = que.poll();
-                               final CompressedSizeInfoColGroup g = 
joinWithAnalysis(l, r);
+                               final CompressedSizeInfoColGroup g = 
_sest.estimateJoinCompressedSize(l, r);
                                if(g != null) {
                                        final double costOfJoin = 
_cest.getCostOfCollectionOfGroups(que, g);
                                        if(costOfJoin < costBeforeJoin) {
@@ -93,7 +93,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
                        while(que.peek() != null) {
                                CompressedSizeInfoColGroup r = que.peek();
                                if(_cest.shouldTryJoin(l, r)) {
-                                       CompressedSizeInfoColGroup g = 
joinWithAnalysis(l, r);
+                                       CompressedSizeInfoColGroup g = 
_sest.estimateJoinCompressedSize(l, r);
                                        if(g != null) {
                                                double costOfJoin = 
_cest.getCostOfColumnGroup(g);
                                                double costIndividual = 
_cest.getCostOfColumnGroup(l) + _cest.getCostOfColumnGroup(r);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index ab94646..b1b0540 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -132,6 +132,23 @@ public class ColGroupUncompressed extends AColGroup {
                _data = data;
        }
 
+       /**
+        * Constructor for allocating a single uncompressed column group.
+        * 
+        * @param data matrix block
+        */
+       public ColGroupUncompressed(MatrixBlock data) {
+               super(generateColumnList(data.getNumColumns()));
+               _data = data;
+       }
+
+       private static int[] generateColumnList(int nCol){
+               int[] cols = new int[nCol];
+               for(int i = 0; i< nCol; i++)
+                       cols[i] = i;
+               return cols;
+       }
+
        @Override
        public CompressionType getCompType() {
                return CompressionType.UNCOMPRESSED;
@@ -632,7 +649,7 @@ public class ColGroupUncompressed extends AColGroup {
                        double[] dv = colSum.getDenseBlockValues();
                        for(int i = 0; i < _colIndexes.length; i++)
                                c[_colIndexes[i]] += dv[i];
-                       
+
                }
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
index 5a72ff7..6c0b9fa 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
@@ -108,16 +108,13 @@ public final class MapToFactory {
                final int nVL = left.getUnique();
                final int nVR = right.getUnique();
                final int size = left.size();
-               final int maxUnique = nVL * nVR;
+               final long maxUnique = nVL * nVR;
+               if(maxUnique > (long)Integer.MAX_VALUE)
+                       throw new DMLCompressionException("Joining impossible 
using linearized join, since each side has a large number of unique values");
                if(size != right.size())
                        throw new DMLCompressionException("Invalid input maps 
to join, must contain same number of rows");
 
-               try {
-                       return computeJoin(left, right, size, nVL, maxUnique);
-               }
-               catch(Exception e) {
-                       throw new DMLCompressionException("Joining failed max 
unique expected:" + maxUnique, e);
-               }
+               return computeJoin(left, right, size, nVL, (int)maxUnique);
        }
 
        private static AMapToData computeJoin(AMapToData left, AMapToData 
right, int size, int nVL, int maxUnique) {
@@ -141,7 +138,6 @@ public final class MapToFactory {
                }
 
                tmp.setUnique(newUID-1);
-               // LOG.error(Arrays.toString(map));
                return tmp;
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
 
b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
index d7f305c..3d58bff 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
@@ -125,6 +125,11 @@ public final class CostEstimatorBuilder implements 
Serializable {
                numberOps += counter.scans + counter.leftMultiplications * 2 + 
counter.rightMultiplications +
                        counter.compressedMultiplications * 4 + 
counter.dictionaryOps;
                numberOps -= counter.decompressions + 
counter.overlappingDecompressions;
+
+               if(counter.decompressions > 1 &&
+                       counter.leftMultiplications + 
counter.rightMultiplications + counter.compressedMultiplications < 1)
+                       // This condition is added for l2svm and mLogReg y 
dataset, that is compressing while it should not.
+                       return false;
                return numberOps > 4;
 
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
index e8bff06..83b794e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
@@ -45,7 +45,7 @@ public final class CostEstimatorFactory {
                                        return b.create(nRows, nCols);
                                }
                                else
-                                       return new DistinctCostEstimator(nRows, 
cs);
+                                       return new MemoryCostEstimator();
                        case MEMORY:
                        default:
                                return new MemoryCostEstimator();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
index 3dd092c..ef7d9dd 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
@@ -82,7 +82,7 @@ public abstract class CompressedSizeEstimator {
                return _numCols;
        }
 
-       public MatrixBlock getData(){
+       public MatrixBlock getData() {
                return _data;
        }
 
@@ -163,7 +163,7 @@ public abstract class CompressedSizeEstimator {
         * @param colIndexes The columns to group together inside a ColGroup
         * @return The CompressedSizeInformation associated with the selected 
ColGroups.
         */
-       public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] 
colIndexes){
+       public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] 
colIndexes) {
                return estimateCompressedColGroupSize(colIndexes, 8, 
getNumRows());
        }
 
@@ -173,7 +173,7 @@ public abstract class CompressedSizeEstimator {
         * the number estimated in sub groups of the given colIndexes.
         * 
         * @param colIndexes         The columns to extract compression 
information from
-        * @param estimate                       An estimate of number of 
unique elements in these columns
+        * @param estimate           An estimate of number of unique elements 
in these columns
         * @param nrUniqueUpperBound The upper bound of unique elements allowed 
in the estimate, can be calculated from the
         *                           number of unique elements estimated in sub 
columns multiplied together. This is
         *                           flexible in the sense that if the sample 
is small then this unique can be manually
@@ -181,13 +181,16 @@ public abstract class CompressedSizeEstimator {
         * 
         * @return The CompressedSizeInfoColGroup fro the given column indexes.
         */
-       public abstract CompressedSizeInfoColGroup 
estimateCompressedColGroupSize(int[] colIndexes, int estimate, int 
nrUniqueUpperBound);
+       public abstract CompressedSizeInfoColGroup 
estimateCompressedColGroupSize(int[] colIndexes, int estimate,
+               int nrUniqueUpperBound);
 
        /**
         * Join two analyzed column groups together. without materializing the 
dictionaries of either side.
         * 
-        * If either side was constructed without analysis then fall back to 
default materialization of double arrays.
+        * if the number of distinct elements in both sides multiplied is 
larger than Integer, return null.
         * 
+        * If either side was constructed without analysis then fall back to 
default materialization of double arrays.
+        * O
         * @param g1 First group
         * @param g2 Second group
         * @return A joined compressed size estimation for the group.
@@ -195,19 +198,39 @@ public abstract class CompressedSizeEstimator {
        public CompressedSizeInfoColGroup 
estimateJoinCompressedSize(CompressedSizeInfoColGroup g1,
                CompressedSizeInfoColGroup g2) {
                final int[] joined = Util.join(g1.getColumns(), 
g2.getColumns());
+               return estimateJoinCompressedSize(joined, g1, g2);
+       }
+
+       /**
+        * Join two analyzed column groups together. without materializing the 
dictionaries of either side.
+        * 
+        * if the number of distinct elements in both sides multiplied is 
larger than Integer, return null.
+        * 
+        * If either side was constructed without analysis then fall back to 
default materialization of double arrays.
+        * 
+        * @param joined The joined column indexes.
+        * @param g1     First group
+        * @param g2     Second group
+        * @return A joined compressed size estimation for the group.
+        */
+       public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] 
joined, CompressedSizeInfoColGroup g1,
+               CompressedSizeInfoColGroup g2) {
                final int g1V = g1.getNumVals();
                final int g2V = g2.getNumVals();
-               if(g1V * g2V < 0 || g1V * g2V > getNumRows())
+               if((long) g1V * g2V > (long) Integer.MAX_VALUE)
                        return null;
-               else if((g1.getMap() == null && g2V != 0) || (g2.getMap() == 
null && g2V != 0))
-                       return estimateCompressedColGroupSize(joined, 
Math.max(g1V + 1, g2V+ 1), Math.min((g1V + 1) * (g2V + 1), getNumRows()));
+
+               final int joinedMaxDistinct = (int) Math.min((long) g1V * 
(long) g2V, getNumRows());
+               if((g1.getMap() == null && g2V != 0) || (g2.getMap() == null && 
g2V != 0))
+                       return estimateCompressedColGroupSize(joined, 
Math.max(g1V + 1, g2V + 1),
+                               Math.min((g1V + 1) * (g2V + 1), getNumRows()));
                else
-                       return estimateJoinCompressedSize(joined, g1, g2);
+                       return estimateJoinCompressedSize(joined, g1, g2, 
joinedMaxDistinct);
 
        }
 
        protected abstract CompressedSizeInfoColGroup 
estimateJoinCompressedSize(int[] joinedcols,
-               CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2);
+               CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, 
int joinedMaxDistinct);
 
        /**
         * Method used to extract the CompressedSizeEstimationFactors from an 
constructed UncompressedBitmap. Note this
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
index 824034a..1531781 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -45,8 +45,9 @@ public class CompressedSizeEstimatorExact extends 
CompressedSizeEstimator {
        }
 
        @Override
-       public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] 
joined, CompressedSizeInfoColGroup g1,
-               CompressedSizeInfoColGroup g2) {
+       protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] 
joined, CompressedSizeInfoColGroup g1,
+               CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
+
                AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap());
                EstimationFactors em = 
EstimationFactors.computeSizeEstimation(joined, map,
                        _cs.validCompressions.contains(CompressionType.RLE), 
_numRows, false);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
index 8b42258..6e9208a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
@@ -31,22 +31,31 @@ public class CompressedSizeEstimatorFactory {
 
        public static CompressedSizeEstimator getSizeEstimator(MatrixBlock 
data, CompressionSettings cs, int k) {
 
-               final int nRows = cs.transposed ? data.getNumColumns() :  
data.getNumRows();
+               final int nRows = cs.transposed ? data.getNumColumns() : 
data.getNumRows();
                final int nCols = cs.transposed ? data.getNumRows() : 
data.getNumColumns();
                final int nnzRows = (int) Math.ceil(data.getNonZeros() / nCols);
 
                final double sampleRatio = cs.samplingRatio;
                final int sampleSize = Math.min(getSampleSize(sampleRatio, 
nRows, cs.minimumSampleSize), maxSampleSize);
-               if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) {
-                       if(sampleSize > nnzRows && nRows > 10000 && nCols > 10 
&& !cs.transposed) {
-                               data = LibMatrixReorg.transpose(data,
-                                       new MatrixBlock(data.getNumColumns(), 
data.getNumRows(), data.isInSparseFormat()), k);
-                               cs.transposed = true;
-                       }
-                       return new CompressedSizeEstimatorExact(data, cs);
+
+               if(nCols > 1000) {
+                       return tryToMakeSampleEstimator(data, cs, sampleRatio, 
sampleSize / 10, nRows, nnzRows, k);
                }
                else {
-                       return tryToMakeSampleEstimator(data, cs, sampleRatio, 
sampleSize, nRows, nnzRows, k);
+                       if(shouldUseExactEstimator(cs, nRows, sampleSize, 
nnzRows)) {
+                               if(sampleSize > nnzRows && nRows > 10000 && 
nCols > 10 && !cs.transposed) {
+                                       LOG.info("Transposing for exact 
estimator");
+                                       data = LibMatrixReorg.transpose(data,
+                                               new 
MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), 
k);
+                                       cs.transposed = true;
+                               }
+                               LOG.info("Using Exact estimator");
+                               return new CompressedSizeEstimatorExact(data, 
cs);
+                       }
+                       else {
+                               LOG.info("Trying sample size: " + sampleSize);
+                               return tryToMakeSampleEstimator(data, cs, 
sampleRatio, sampleSize, nRows, nnzRows, k);
+                       }
                }
 
        }
@@ -54,8 +63,9 @@ public class CompressedSizeEstimatorFactory {
        private static CompressedSizeEstimator 
tryToMakeSampleEstimator(MatrixBlock data, CompressionSettings cs,
                double sampleRatio, int sampleSize, int nRows, int nnzRows, int 
k) {
                CompressedSizeEstimatorSample estS = new 
CompressedSizeEstimatorSample(data, cs, sampleSize, k);
+               int double_number = 1;
                while(estS.getSample() == null) {
-                       LOG.warn("Doubling sample size");
+                       LOG.error("Warining doubling sample size " + 
double_number++);
                        sampleSize = sampleSize * 2;
                        if(shouldUseExactEstimator(cs, nRows, sampleSize, 
nnzRows))
                                return new CompressedSizeEstimatorExact(data, 
cs);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
index 24ee358..218e12b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -107,18 +107,17 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator {
        }
 
        @Override
-       public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] 
joined, CompressedSizeInfoColGroup g1,
-               CompressedSizeInfoColGroup g2) {
-               final int g1V = g1.getMap().getUnique();
-               final int g2V = g2.getMap().getUnique();
-               final int nrUniqueUpperBound = g1V * g2V;
+       protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] 
joined, CompressedSizeInfoColGroup g1,
+               CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
+               if((long)g1.getNumVals() * g2.getNumVals() 
>(long)Integer.MAX_VALUE )
+                       return null;
 
                final AMapToData map = MapToFactory.join(g1.getMap(), 
g2.getMap());
                EstimationFactors sampleFacts = 
EstimationFactors.computeSizeEstimation(joined, map,
                        _cs.validCompressions.contains(CompressionType.RLE), 
map.size(), false);
 
                // result facts
-               EstimationFactors em = estimateCompressionFactors(sampleFacts, 
map, joined, nrUniqueUpperBound);
+               EstimationFactors em = estimateCompressionFactors(sampleFacts, 
map, joined, joinedMaxDistinct);
                return new CompressedSizeInfoColGroup(em, 
_cs.validCompressions, map);
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index e2b7491..1026aa2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -66,7 +66,7 @@ public class CompressedSizeInfoColGroup {
                _facts = new EstimationFactors(columns, numVals, numRows);
                _cardinalityRatio = (double) numVals / numRows;
                _sizes = null;
-               _bestCompressionType = null;
+               _bestCompressionType = CompressionType.DDC;
                _minSize = 
ColGroupSizes.estimateInMemorySizeDDC(columns.length, numVals, numRows, 1.0, 
false);
                _map = null;
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
index fc086e6..ab60d9f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
@@ -22,12 +22,10 @@ package org.apache.sysds.runtime.compress.lib;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
-import org.apache.sysds.runtime.compress.CompressionStatistics;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -41,24 +39,18 @@ public class CLALibAppend {
                if(left.isEmpty() && right instanceof CompressedMatrixBlock)
                        return appendLeftEmpty(left, (CompressedMatrixBlock) 
right);
                else if(right.isEmpty() && left instanceof 
CompressedMatrixBlock)
-                       return appendRightEmpty((CompressedMatrixBlock)left, 
right);
+                       return appendRightEmpty((CompressedMatrixBlock) left, 
right);
 
                final int m = left.getNumRows();
                final int n = left.getNumColumns() + right.getNumColumns();
 
-               // try to compress both sides (if not already compressed).
                if(!(left instanceof CompressedMatrixBlock) && m > 1000) {
-                       LOG.warn("Compressing left for append operation");
-                       Pair<MatrixBlock, CompressionStatistics> x = 
CompressedMatrixBlockFactory.compress(left);
-                       if(x.getRight().getRatio() > 3.0)
-                               left = x.getLeft();
-
+                       LOG.info("Appending uncompressed column group left");
+                       left = 
CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(left);
                }
                if(!(right instanceof CompressedMatrixBlock) && m > 1000) {
-                       LOG.warn("Compressing right for append operation");
-                       Pair<MatrixBlock, CompressionStatistics> x = 
CompressedMatrixBlockFactory.compress(right);
-                       if(x.getRight().getRatio() > 3.0)
-                               right = x.getLeft();
+                       LOG.warn("Appending uncompressed column group right");
+                       left = 
CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(right);
                }
 
                // if compression failed then use default append method.
@@ -72,14 +64,22 @@ public class CLALibAppend {
                CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
 
                ret = appendColGroups(ret, leftC.getColGroups(), 
rightC.getColGroups(), leftC.getNumColumns());
-               return ret;
+
+               double compressedSize = ret.getInMemorySize();
+               double uncompressedSize = MatrixBlock.estimateSizeInMemory(m,n, 
ret.getSparsity());
+
+               
+               if(compressedSize * 10 < uncompressedSize)
+                       return ret;
+               else
+                       return ret.getUncompressed("Decompressing c bind 
matrix");
        }
 
        private static MatrixBlock appendRightEmpty(CompressedMatrixBlock left, 
MatrixBlock right) {
 
                final int m = left.getNumRows();
                final int n = left.getNumColumns() + right.getNumColumns();
-               CompressedMatrixBlock ret = new CompressedMatrixBlock(m,n);
+               CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
 
                List<AColGroup> newGroup = new ArrayList<>(1);
                newGroup.add(ColGroupEmpty.generate(right.getNumColumns(), 
right.getNumRows()));
@@ -91,7 +91,7 @@ public class CLALibAppend {
        private static MatrixBlock appendLeftEmpty(MatrixBlock left, 
CompressedMatrixBlock right) {
                final int m = left.getNumRows();
                final int n = left.getNumColumns() + right.getNumColumns();
-               CompressedMatrixBlock ret = new CompressedMatrixBlock(m,n);
+               CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
 
                List<AColGroup> newGroup = new ArrayList<>(1);
                newGroup.add(ColGroupEmpty.generate(left.getNumColumns(), 
left.getNumRows()));
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
index 79cf1c3..99d9c92 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysds.runtime.compress.lib;
 
-import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -59,33 +58,41 @@ public class CLALibBinaryCellOp {
 
        private static final Log LOG = 
LogFactory.getLog(CLALibBinaryCellOp.class.getName());
 
-       public static MatrixBlock binaryOperations(BinaryOperator op, 
CompressedMatrixBlock m1, MatrixValue thatValue,
-               MatrixValue result) {
+       public static MatrixBlock binaryOperations(BinaryOperator op, 
CompressedMatrixBlock m1, MatrixBlock thatValue,
+               MatrixBlock result) {
                MatrixBlock that = 
CompressedMatrixBlock.getUncompressed(thatValue, "Decompressing right side in 
BinaryOps");
                if(m1.getNumRows() <= 0)
                        LOG.error(m1);
-               if(thatValue.getNumRows() <= 0)
-                       LOG.error(thatValue);
+               if(that.getNumRows() <= 0)
+                       LOG.error(that);
                LibMatrixBincell.isValidDimensionsBinary(m1, that);
-               thatValue = that;
                BinaryAccessType atype = 
LibMatrixBincell.getBinaryAccessType(m1, that);
-               return selectProcessingBasedOnAccessType(op, m1, thatValue, 
result, atype, false);
+               return selectProcessingBasedOnAccessType(op, m1, that, result, 
atype, false);
        }
 
-       public static MatrixBlock binaryOperationsLeft(BinaryOperator op, 
CompressedMatrixBlock m1, MatrixValue thatValue,
-               MatrixValue result) {
+       public static MatrixBlock binaryOperationsLeft(BinaryOperator op, 
CompressedMatrixBlock m1, MatrixBlock thatValue,
+               MatrixBlock result) {
                MatrixBlock that = 
CompressedMatrixBlock.getUncompressed(thatValue, "Decompressing left side in 
BinaryOps");
                LibMatrixBincell.isValidDimensionsBinary(that, m1);
                thatValue = that;
                BinaryAccessType atype = 
LibMatrixBincell.getBinaryAccessType(that, m1);
-               return selectProcessingBasedOnAccessType(op, m1, thatValue, 
result, atype, true);
+               return selectProcessingBasedOnAccessType(op, m1, that, result, 
atype, true);
        }
 
        private static MatrixBlock 
selectProcessingBasedOnAccessType(BinaryOperator op, CompressedMatrixBlock m1,
-               MatrixValue thatValue, MatrixValue result, BinaryAccessType 
atype, boolean left) {
-               MatrixBlock that = (MatrixBlock) thatValue;
-               if(atype == BinaryAccessType.MATRIX_COL_VECTOR)
-                       return binaryMVCol(m1, that, op, left);
+               MatrixBlock that, MatrixBlock result, BinaryAccessType atype, 
boolean left) {
+               if(atype == BinaryAccessType.MATRIX_COL_VECTOR) {
+                       MatrixBlock d_compressed = m1.getCachedDecompressed();
+                       if(d_compressed != null) {
+                               if(left)
+                                       return that.binaryOperations(op, 
d_compressed, result);
+                               else
+                                       return 
d_compressed.binaryOperations(op, that, result);
+                       }
+                       else
+                               return binaryMVCol(m1, that, op, left);
+
+               }
                else if(atype == BinaryAccessType.MATRIX_MATRIX) {
                        if(that.isEmpty()) {
                                ScalarOperator sop = left ? new 
LeftScalarOperator(op.fn, 0, -1) : new RightScalarOperator(op.fn, 0,
@@ -93,8 +100,7 @@ public class CLALibBinaryCellOp {
                                return CLALibScalar.scalarOperations(sop, m1, 
result);
                        }
                        else {
-                               SoftReference<MatrixBlock> msf = 
m1.getSoftReferenceToDecompressed();
-                               MatrixBlock d_compressed = msf != null ? 
msf.get() : null;
+                               MatrixBlock d_compressed = 
m1.getCachedDecompressed();
                                if(d_compressed != null) {
                                        // copy the decompressed matrix if 
there is a decompressed matrix already.
                                        MatrixBlock tmp = d_compressed;
@@ -117,7 +123,7 @@ public class CLALibBinaryCellOp {
                        return bincellOp(m1, that, 
setupCompressedReturnMatrixBlock(m1, result), op, left);
                else {
                        LOG.warn("Decompressing since Binary Ops" + op.fn + " 
is not supported compressed");
-                       return 
CompressedMatrixBlock.getUncompressed(m1).binaryOperations(op, thatValue, 
result);
+                       return 
CompressedMatrixBlock.getUncompressed(m1).binaryOperations(op, that, result);
                }
        }
 
@@ -295,7 +301,7 @@ public class CLALibBinaryCellOp {
                final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
                final int k = op.getNumThreads();
                long nnz = 0;
-               ;
+
                if(k <= 1) {
                        for(int i = 0; i * blkz < m1.getNumRows(); i++) {
                                if(left)
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
index 892ec95..5c9f7b2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
@@ -34,6 +34,7 @@ import 
org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysds.runtime.functionobjects.IndexFunction;
@@ -50,7 +51,6 @@ import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysds.runtime.matrix.data.LibMatrixBincell;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysds.runtime.matrix.data.MatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
@@ -71,43 +71,64 @@ public class CLALibCompAgg {
                }
        };
 
-       public static MatrixBlock aggregateUnary(CompressedMatrixBlock 
inputMatrix, MatrixValue result,
+       public static MatrixBlock aggregateUnary(CompressedMatrixBlock 
inputMatrix, MatrixBlock result,
                AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, 
boolean inCP) {
 
                // prepare output dimensions
                CellIndex tempCellIndex = new CellIndex(-1, -1);
                op.indexFn.computeDimension(inputMatrix.getNumRows(), 
inputMatrix.getNumColumns(), tempCellIndex);
 
+               if(requireDecompression(inputMatrix, op)) {
+                       // Decide if we should use the cached decompressed 
Version, or we should decompress.
+                       final double denseSize = 
MatrixBlock.estimateSizeDenseInMemory(inputMatrix.getNumRows(),
+                               inputMatrix.getNumColumns());
+                       final double currentSize = 
inputMatrix.getInMemorySize();
+                       final double localMaxMemory = 
InfrastructureAnalyzer.getLocalMaxMemory();
+
+                       if(denseSize < 5 * currentSize && 
inputMatrix.getColGroups().size() > 5 &&
+                               denseSize <= localMaxMemory / 2) {
+                               LOG.info("Decompressing for unaryAggregate 
because of overlapping state");
+                               inputMatrix.decompress(op.getNumThreads());
+                       }
+                       MatrixBlock decomp = 
inputMatrix.getCachedDecompressed();
+                       if(decomp != null)
+                               return decomp.aggregateUnaryOperations(op, 
result, blen, indexesIn, inCP);
+               }
+               
                // initialize and allocate the result
                if(result == null)
                        result = new MatrixBlock(tempCellIndex.row, 
tempCellIndex.column, false);
                else
                        result.reset(tempCellIndex.row, tempCellIndex.column, 
false);
-               MatrixBlock ret = (MatrixBlock) result;
 
-               ret.allocateDenseBlock();
+               result.allocateDenseBlock();
 
                AggregateUnaryOperator opm = replaceKahnOperations(op);
 
                if(inputMatrix.getColGroups() != null) {
-                       fillStart(ret, opm);
+                       fillStart(result, opm);
 
-                       if(inputMatrix.isOverlapping() &&
-                               (opm.aggOp.increOp.fn instanceof KahanPlusSq || 
(opm.aggOp.increOp.fn instanceof Builtin &&
-                                       (((Builtin) 
opm.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
-                                               ((Builtin) 
opm.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX))))
-                               aggregateUnaryOverlapping(inputMatrix, ret, 
opm, indexesIn, inCP);
+                       if(requireDecompression(inputMatrix, opm))
+                               aggregateUnaryOverlapping(inputMatrix, result, 
opm, indexesIn, inCP);
                        else
-                               
aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, ret, opm, blen, 
indexesIn, inCP);
+                               
aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, result, opm, blen, 
indexesIn, inCP);
                }
-               ret.recomputeNonZeros();
+               
+               result.recomputeNonZeros();
                if(op.aggOp.existsCorrection() && !inCP) {
-                       ret = addCorrection(ret, op);
+                       result = addCorrection(result, op);
                        if(op.aggOp.increOp.fn instanceof Mean)
-                               ret = addCellCount(ret, op, 
inputMatrix.getNumRows(), inputMatrix.getNumColumns());
+                               result = addCellCount(result, op, 
inputMatrix.getNumRows(), inputMatrix.getNumColumns());
                }
-               return ret;
+               return result;
+
+       }
 
+       private static boolean requireDecompression(CompressedMatrixBlock 
inputMatrix, AggregateUnaryOperator op) {
+               return inputMatrix.isOverlapping() &&
+                       (op.aggOp.increOp.fn instanceof KahanPlusSq || 
(op.aggOp.increOp.fn instanceof Builtin &&
+                               (((Builtin) 
op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+                                       ((Builtin) 
op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)));
        }
 
        private static MatrixBlock addCorrection(MatrixBlock ret, 
AggregateUnaryOperator op) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java
deleted file mode 100644
index d4fbb7e..0000000
--- 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// package org.apache.sysds.runtime.compress.lib;
-
-// import java.util.ArrayList;
-// import java.util.Arrays;
-// import java.util.List;
-// import java.util.concurrent.Callable;
-// import java.util.concurrent.ExecutionException;
-// import java.util.concurrent.ExecutorService;
-// import java.util.concurrent.Future;
-
-// import org.apache.sysds.hops.OptimizerUtils;
-// import org.apache.sysds.runtime.DMLRuntimeException;
-// import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
-// import org.apache.sysds.runtime.compress.CompressionSettings;
-// import org.apache.sysds.runtime.compress.colgroup.AColGroup;
-// import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
-// import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
-// import org.apache.sysds.runtime.functionobjects.Equals;
-// import org.apache.sysds.runtime.functionobjects.GreaterThan;
-// import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
-// import org.apache.sysds.runtime.functionobjects.LessThan;
-// import org.apache.sysds.runtime.functionobjects.LessThanEquals;
-// import org.apache.sysds.runtime.functionobjects.NotEquals;
-// import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-// import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
-// import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
-// import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
-// import org.apache.sysds.runtime.util.CommonThreadPool;
-
-// /**
-//  * This class is used for relational operators that return binary values 
depending on individual cells values in the
-//  * compression. This indicate that the resulting vectors/matrices are 
amenable to compression since they only contain
-//  * two distinct values, true or false.
-//  * 
-//  */
-// public class CLALibRelationalOp {
-//     // private static final Log LOG = 
LogFactory.getLog(LibRelationalOp.class.getName());
-
-//     /** Thread pool matrix Block for materializing decompressed groups. */
-//     private static ThreadLocal<MatrixBlock> memPool = new 
ThreadLocal<MatrixBlock>() {
-//             @Override
-//             protected MatrixBlock initialValue() {
-//                     return null;
-//             }
-//     };
-
-//     protected static boolean isValidForRelationalOperation(ScalarOperator 
sop, CompressedMatrixBlock m1) {
-//             return m1.isOverlapping() &&
-//                     (sop.fn instanceof LessThan || sop.fn instanceof 
LessThanEquals || sop.fn instanceof GreaterThan ||
-//                             sop.fn instanceof GreaterThanEquals || sop.fn 
instanceof Equals || sop.fn instanceof NotEquals);
-//     }
-
-//     // public static MatrixBlock 
overlappingRelativeRelationalOperation(ScalarOperator sop, 
CompressedMatrixBlock m1) {
-
-//     //      List<AColGroup> colGroups = m1.getColGroups();
-//     //      boolean less = ((sop.fn instanceof LessThan || sop.fn 
instanceof LessThanEquals) &&
-//     //              sop instanceof LeftScalarOperator) ||
-//     //              (sop instanceof RightScalarOperator &&
-//     //                      (sop.fn instanceof GreaterThan || sop.fn 
instanceof GreaterThanEquals));
-//     //      double v = sop.getConstant();
-//     //      double min = m1.min();
-//     //      double max = m1.max();
-
-//     //      // Shortcut:
-//     //      // If we know worst case min and worst case max and the values 
to compare to in all cases is
-//     //      // less then or greater than worst then we can return a full 
matrix with either 1 or 0.
-
-//     //      if(v < min || v > max) {
-//     //              if(sop.fn instanceof Equals) {
-//     //                      return makeConstZero(m1.getNumRows(), 
m1.getNumColumns());
-//     //              }
-//     //              else if(sop.fn instanceof NotEquals) {
-//     //                      return makeConstOne(m1.getNumRows(), 
m1.getNumColumns());
-//     //              }
-//     //              else if(less) {
-//     //                      if(v < min || ((sop.fn instanceof 
LessThanEquals || sop.fn instanceof GreaterThan) && v <= min))
-//     //                              return makeConstOne(m1.getNumRows(), 
m1.getNumColumns());
-//     //                      else
-//     //                              return makeConstZero(m1.getNumRows(), 
m1.getNumColumns());
-//     //              }
-//     //              else {
-//     //                      if(v > max || ((sop.fn instanceof 
LessThanEquals || sop.fn instanceof GreaterThan) && v >= max))
-//     //                              return makeConstOne(m1.getNumRows(), 
m1.getNumColumns());
-//     //                      else
-//     //                              return makeConstZero(m1.getNumRows(), 
m1.getNumColumns());
-//     //              }
-//     //      }
-//     //      else {
-//     //              return processNonConstant(sop, minMax, min, max, 
m1.getNumRows(), m1.getNumColumns(), less);
-//     //      }
-
-//     // }
-
-//     private static MatrixBlock makeConstOne(int rows, int cols) {
-//     //      List<AColGroup> newColGroups = new ArrayList<>();
-//     //      int[] colIndexes = new int[cols];
-//     //      for(int i = 0; i < colIndexes.length; i++) {
-//     //              colIndexes[i] = i;
-//     //      }
-//     //      double[] values = new double[cols];
-//     //      Arrays.fill(values, 1);
-
-//     //      newColGroups.add(new ColGroupConst(colIndexes, rows, new 
Dictionary(values)));
-//     //      CompressedMatrixBlock ret = new CompressedMatrixBlock(rows, 
cols);
-//     //      ret.allocateColGroupList(newColGroups);
-//     //      ret.setNonZeros(cols * rows);
-//     //      ret.setOverlapping(false);
-//     //      return ret;
-//     // }
-
-//     // private static MatrixBlock makeConstZero(int rows, int cols) {
-//     //      MatrixBlock sb = new MatrixBlock(rows, cols, true, 0);
-//     //      return sb;
-//     // }
-
-//     // private static MatrixBlock processNonConstant(ScalarOperator sop, 
MinMaxGroup[] minMax, double minS, double maxS,
-//     //      final int rows, final int cols, boolean less) {
-
-//     //      // BitSet res = new BitSet(ret.getNumColumns() * 
ret.getNumRows());
-//     //      MatrixBlock res = new MatrixBlock(rows, cols, true, 
0).allocateBlock();
-//     //      int k = OptimizerUtils.getConstrainedNumThreads(-1);
-//     //      int outRows = rows;
-//     //      long nnz = 0;
-//     //      if(k == 1) {
-//     //              final int b = CompressionSettings.BITMAP_BLOCK_SZ / 
cols;
-//     //              final int blkz = (outRows < b) ? outRows : b;
-
-//     //              MatrixBlock tmp = new MatrixBlock(blkz, cols, false, 
-1).allocateBlock();
-//     //              for(int i = 0; i * blkz < outRows; i++) {
-//     //                      for(MinMaxGroup mmg : minMax) 
-//     //                              mmg.g.decompressToBlockUnSafe(tmp, i * 
blkz, Math.min((i + 1) * blkz, rows), 0);
-                               
-//     //                      for(int row = 0; row < blkz && row < rows - i * 
blkz; row++) {
-//     //                              int off = (row + i * blkz);
-//     //                              for(int col = 0; col < cols; col++) {
-//     //                                      res.quickSetValue(off, col, 
sop.executeScalar(tmp.quickGetValue(row, col)));
-//     //                                      if(res.quickGetValue(off, col) 
!= 0) {
-//     //                                              nnz++;
-//     //                                      }
-//     //                              }
-//     //                      }
-//     //              }
-//     //              tmp.reset();
-//     //              res.setNonZeros(nnz);
-//     //      }
-//     //      else {
-//     //              final int blkz = CompressionSettings.BITMAP_BLOCK_SZ / 
2;
-//     //              ExecutorService pool = CommonThreadPool.get(k);
-//     //              ArrayList<RelationalTask> tasks = new ArrayList<>();
-
-//     //              try {
-//     //                      for(int i = 0; i * blkz < outRows; i++) {
-//     //                              RelationalTask rt = new 
RelationalTask(minMax, i, blkz, res, rows, cols, sop);
-//     //                              tasks.add(rt);
-//     //                      }
-//     //                      List<Future<Object>> futures = 
pool.invokeAll(tasks);
-//     //                      pool.shutdown();
-//     //                      for(Future<Object> f : futures)
-//     //                              f.get();
-//     //              }
-//     //              catch(InterruptedException | ExecutionException e) {
-//     //                      e.printStackTrace();
-//     //                      throw new DMLRuntimeException(e);
-//     //              }
-
-//     //      }
-//     //      memPool.remove();
-
-//     //      return res;
-//     // }
-
-//     // protected static class MinMaxGroup implements 
Comparable<MinMaxGroup> {
-//     //      double min;
-//     //      double max;
-//     //      AColGroup g;
-//     //      double[] values;
-
-//     //      public MinMaxGroup(double min, double max, AColGroup g) {
-//     //              this.min = min;
-//     //              this.max = max;
-//     //              this.g = g;
-
-//     //              this.values = g.getValues();
-//     //      }
-
-//     //      @Override
-//     //      public int compareTo(MinMaxGroup o) {
-//     //              double t = max - min;
-//     //              double ot = o.max - o.min;
-//     //              return Double.compare(t, ot);
-//     //      }
-
-//     //      @Override
-//     //      public String toString() {
-//     //              StringBuilder sb = new StringBuilder();
-//     //              sb.append("MMG: ");
-//     //              sb.append("[" + min + "," + max + "]");
-//     //              sb.append(" " + g.getClass().getSimpleName());
-//     //              return sb.toString();
-//     //      }
-//     // }
-
-//     // private static class RelationalTask implements Callable<Object> {
-//     //      private final MinMaxGroup[] _minMax;
-//     //      private final int _i;
-//     //      private final int _blkz;
-//     //      private final MatrixBlock _res;
-//     //      private final int _rows;
-//     //      private final int _cols;
-//     //      private final ScalarOperator _sop;
-
-//     //      protected RelationalTask(MinMaxGroup[] minMax, int i, int blkz, 
MatrixBlock res, int rows, int cols,
-//     //              ScalarOperator sop) {
-//     //              _minMax = minMax;
-//     //              _i = i;
-//     //              _blkz = blkz;
-//     //              _res = res;
-//     //              _rows = rows;
-//     //              _cols = cols;
-//     //              _sop = sop;
-//     //      }
-
-//     //      @Override
-//     //      public Object call() {
-//     //              MatrixBlock tmp = memPool.get();
-//     //              if(tmp == null) {
-//     //                      memPool.set(new MatrixBlock(_blkz, _cols, 
false, -1).allocateBlock());
-//     //                      tmp = memPool.get();
-//     //              }
-//     //              else {
-//     //                      tmp = memPool.get();
-//     //                      tmp.reset(_blkz, _cols, false, -1);
-//     //              }
-
-//     //              for(MinMaxGroup mmg : _minMax) {
-//     //                      if(mmg.g.getNumberNonZeros() != 0)
-//     //                              mmg.g.decompressToBlockUnSafe(tmp, _i * 
_blkz, Math.min((_i + 1) * _blkz, mmg.g.getNumRows()), 0);
-//     //              }
-
-//     //              for(int row = 0, off = _i * _blkz; row < _blkz && row < 
_rows - _i * _blkz; row++, off++) {
-//     //                      for(int col = 0; col < _cols; col++) {
-//     //                              _res.appendValue(off, col, 
_sop.executeScalar(tmp.quickGetValue(row, col)));
-//     //                      }
-//     //              }
-//     //              return null;
-//     //      }
-//     // }
-// }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
index aea5488..1822685 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
@@ -56,13 +56,6 @@ public class CLALibScalar {
        private static final int MINIMUM_PARALLEL_SIZE = 8096;
 
        public static MatrixBlock scalarOperations(ScalarOperator sop, 
CompressedMatrixBlock m1, MatrixValue result) {
-               // Special case handling of overlapping relational operations
-               // if(CLALibRelationalOp.isValidForRelationalOperation(sop, 
m1)) {
-               //      MatrixBlock ret =  
CLALibRelationalOp.overlappingRelativeRelationalOperation(sop, m1);
-               //      ret.recomputeNonZeros();
-               //      return ret;
-               // }
-
                if(isInvalidForCompressedOutput(m1, sop)) {
                        LOG.warn("scalar overlapping not supported for op: " + 
sop.fn);
                        MatrixBlock m1d = m1.decompress(sop.getNumThreads());
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
 
b/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
index b37acc1..31b3714 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
@@ -404,7 +404,10 @@ public class WorkloadAnalyzer {
                                ArrayList<Hop> in = hop.getInput();
                                if(isOverlapping(in.get(0)) || 
isOverlapping(in.get(1)))
                                        overlapping.add(hop.getHopID());
-                               return new OpNormal(hop, true);
+                               // CBind is in worst case decompressing, but 
can be compressing the other side if it is trivially compressable.
+                               // to make the optimizer correct we need to 
mark this operation as decompressing, since it is the worst possible outcome.
+                               // Currently we dont optimize for operations 
that are located past a cbind.
+                               return new OpDecompressing(hop);
                        }
                        else if(HopRewriteUtils.isBinary(hop, OpOp2.RBIND)) {
                                ArrayList<Hop> in = hop.getInput();
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 880f31f..faef2c7 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -66,6 +66,7 @@ import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.data.TensorIndexes;
 import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.spark.DeCompressionSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
 import org.apache.sysds.runtime.instructions.spark.data.LineageObject;
 import org.apache.sysds.runtime.instructions.spark.data.PartitionedBlock;
@@ -1280,16 +1281,21 @@ public class SparkExecutionContext extends 
ExecutionContext
                return out;
        }
 
-       @SuppressWarnings("unchecked")
+       // @SuppressWarnings("unchecked")
        public static long writeMatrixRDDtoHDFS( RDDObject rdd, String path, 
FileFormat fmt )
        {
                JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
                InputOutputInfo oinfo = InputOutputInfo.get(DataType.MATRIX, 
fmt);
                
+               // if compression is enabled decompress all blocks before 
writing to disk TEMPORARY MODIFICATION UNTILL MATRIXBLOCK IS MERGED WITH 
COMPRESSEDMATRIXBLOCK
+               if(ConfigurationManager.isCompressionEnabled())
+                       lrdd = lrdd.mapValues(new 
DeCompressionSPInstruction.DeCompressionFunction());
+
                //piggyback nnz maintenance on write
                LongAccumulator aNnz = 
getSparkContextStatic().sc().longAccumulator("nnz");
                lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
 
+
                //save file is an action which also triggers nnz maintenance
                lrdd.saveAsHadoopFile(path,
                        oinfo.keyClass,
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
index 4ea0284..39b2408 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
@@ -30,6 +30,12 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 
+/**
+ * Instruction that performs
+ * 
+ * res = X / rowsum(x)
+ * 
+ */
 public class BinUaggChainSPInstruction extends UnarySPInstruction {
        // operators
        private BinaryOperator _bOp = null;
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
index 6115654..a998a74 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
@@ -29,6 +29,7 @@ import java.io.DataOutputStream;
 import java.util.Collection;
 
 import org.apache.commons.math3.random.Well1024a;
+import org.apache.sysds.common.Types.CorrectionLocationType;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
@@ -36,10 +37,15 @@ import 
org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
 import org.apache.sysds.runtime.compress.CompressionStatistics;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.functionobjects.KahanPlus;
+import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.ReduceAll;
 import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct;
 import org.apache.sysds.runtime.matrix.data.LibMatrixDatagen;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.RandomMatrixGenerator;
+import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
 import 
org.apache.sysds.runtime.matrix.operators.CountDistinctOperator.CountDistinctTypes;
@@ -570,6 +576,33 @@ public class CompressedMatrixTest extends 
AbstractCompressedUnaryTests {
                cmb.copy(cmb);
        }
 
+       @Test
+       public void testAggregateTernaryOperation() {
+               try {
+                       if(!(cmb instanceof CompressedMatrixBlock) || rows * 
cols > 10000)
+                               return;
+                       CorrectionLocationType corr = 
CorrectionLocationType.LASTCOLUMN;
+                       AggregateOperator agg = new AggregateOperator(0, 
KahanPlus.getKahanPlusFnObject(), corr);
+                       AggregateTernaryOperator op = new 
AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg,
+                               ReduceAll.getReduceAllFnObject());
+
+                       int nrow = mb.getNumRows();
+                       int ncol = mb.getNumColumns();
+
+                       MatrixBlock m2 = new MatrixBlock(nrow, ncol, 13.0);
+                       MatrixBlock m3 = new MatrixBlock(nrow, ncol, 14.0);
+
+                       MatrixBlock ret1 = cmb.aggregateTernaryOperations(cmb, 
m2, m3, null, op, true);
+                       MatrixBlock ret2 = mb.aggregateTernaryOperations(mb, 
m2, m3, null, op, true);
+
+                       compareResultMatrices(ret2, ret1, 1);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       throw new DMLRuntimeException(e);
+               }
+       }
+
        private static long getJolSize(CompressedMatrixBlock cmb, 
CompressionStatistics cStat) {
                Layouter l = new HotSpotLayouter(new X86_64_DataModel());
                long jolEstimate = 0;
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
 
b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index eb5ceca..c291235 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -31,7 +31,6 @@ import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.common.Types.CorrectionLocationType;
 import org.apache.sysds.lops.MMTSJ.MMTSJType;
 import org.apache.sysds.lops.MapMultChain.ChainType;
 import org.apache.sysds.runtime.DMLRuntimeException;
@@ -55,22 +54,18 @@ import org.apache.sysds.runtime.functionobjects.Divide;
 import org.apache.sysds.runtime.functionobjects.Equals;
 import org.apache.sysds.runtime.functionobjects.GreaterThan;
 import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
-import org.apache.sysds.runtime.functionobjects.KahanPlus;
 import org.apache.sysds.runtime.functionobjects.LessThan;
 import org.apache.sysds.runtime.functionobjects.LessThanEquals;
 import org.apache.sysds.runtime.functionobjects.Minus;
 import org.apache.sysds.runtime.functionobjects.Multiply;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.functionobjects.Power2;
-import org.apache.sysds.runtime.functionobjects.ReduceAll;
 import org.apache.sysds.runtime.functionobjects.SwapIndex;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
 import org.apache.sysds.runtime.functionobjects.Xor;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
 import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
@@ -1253,28 +1248,6 @@ public abstract class CompressedTestBase extends 
TestBase {
        }
 
        @Test
-       public void aggregateTernaryOperations() {
-               if(!(cmb instanceof CompressedMatrixBlock) || rows * cols > 
10000)
-                       return;
-
-               MatrixBlock m1 = new MatrixBlock();
-               MatrixBlock m2 = new MatrixBlock();
-               MatrixBlock m3 = null;
-               CorrectionLocationType corr = CorrectionLocationType.LASTCOLUMN;
-               AggregateOperator agg = new AggregateOperator(0, 
KahanPlus.getKahanPlusFnObject(), corr);
-               AggregateTernaryOperator op = new 
AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg,
-                       ReduceAll.getReduceAllFnObject(), _k);
-
-               boolean inCP = true;
-
-               MatrixBlock ret1 = mb.aggregateTernaryOperations(m1, m2, m3, 
null, op, inCP);
-               MatrixBlock ret2 = cmb.aggregateTernaryOperations(m1, m2, m3, 
null, op, inCP);
-
-               compareResultMatrices(ret1, ret2, 1);
-
-       }
-
-       @Test
        public void unaryOperations() {
                if(!(cmb instanceof CompressedMatrixBlock) || 
cmb.getNumColumns() < 2)
                        return;
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
index aacb716..9422e58 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
@@ -119,7 +119,7 @@ public class WorkloadTest {
                args.put("$2", "FALSE");
                args.put("$3", "0");
 
-               tests.add(new Object[] {0, 0, 0, 1, 1, 1, 6, 0, true, false, 
"functions/lmDS.dml", args});
+               tests.add(new Object[] {0, 1, 0, 1, 1, 1, 6, 0, true, false, 
"functions/lmDS.dml", args});
                tests.add(new Object[] {0, 0, 0, 1, 0, 1, 0, 0, true, true, 
"functions/lmDS.dml", args});
                tests.add(new Object[] {0, 0, 0, 1, 10, 10, 1, 0, true, true, 
"functions/lmCG.dml", args});
 
@@ -134,15 +134,15 @@ public class WorkloadTest {
                args.put("$1", testFile);
                args.put("$2", "TRUE");
                args.put("$3", "1");
-               tests.add(new Object[] {0, 0, 1, 1, 1, 1, 1, 0, true, true, 
"functions/lmDS.dml", args});
-               tests.add(new Object[] {0, 0, 1, 1, 11, 10, 2, 0, true, true, 
"functions/lmCG.dml", args});
+               tests.add(new Object[] {0, 1, 0, 0, 0, 0, 1, 0, false, true, 
"functions/lmDS.dml", args});
+               tests.add(new Object[] {0, 1, 1, 1, 11, 10, 2, 0, true, true, 
"functions/lmCG.dml", args});
 
                args = new HashMap<>();
                args.put("$1", testFile);
                args.put("$2", "TRUE");
                args.put("$3", "2");
-               tests.add(new Object[] {0, 0, 1, 1, 1, 1, 3, 0, true, true, 
"functions/lmDS.dml", args});
-               tests.add(new Object[] {0, 0, 1, 1, 11, 10, 4, 0, true, true, 
"functions/lmCG.dml", args});
+               tests.add(new Object[] {0, 1, 0, 0, 0, 0, 1, 0, false, true, 
"functions/lmDS.dml", args});
+               tests.add(new Object[] {0, 1, 1, 1, 11, 10, 2, 0, true, true, 
"functions/lmCG.dml", args});
 
                args = new HashMap<>();
                args.put("$1", testFile);
@@ -176,7 +176,7 @@ public class WorkloadTest {
                        CostEstimatorBuilder ceb = new 
CostEstimatorBuilder(wtr);
                        InstructionTypeCounter itc = ceb.getCounter();
 
-                       verify(wtr, itc, ceb);
+                       verify(wtr, itc, ceb, scriptName, args);
                }
                catch(Exception e) {
                        e.printStackTrace();
@@ -184,9 +184,9 @@ public class WorkloadTest {
                }
        }
 
-       private void verify(WTreeRoot wtr, InstructionTypeCounter itc, 
CostEstimatorBuilder ceb) {
+       private void verify(WTreeRoot wtr, InstructionTypeCounter itc, 
CostEstimatorBuilder ceb, String name, Map<String, String> args) {
 
-               String errorString = wtr + "\n" + itc + " \n ";
+               String errorString = wtr + "\n" + itc + " \n " + name + "  -- " 
+ args +  "\n";
                Assert.assertEquals(errorString + "scans:", scans, 
itc.getScans());
                Assert.assertEquals(errorString + "decompressions", 
decompressions, itc.getDecompressions());
                Assert.assertEquals(errorString + "overlappingDecompressions", 
overlappingDecompressions,
@@ -197,7 +197,7 @@ public class WorkloadTest {
                        itc.getCompressedMultiplications());
                Assert.assertEquals(errorString + "dictionaryOps", 
dictionaryOps, itc.getDictionaryOps());
                Assert.assertEquals(errorString + "lookup", indexing, 
itc.getIndexing());
-               Assert.assertEquals(shouldCompress, ceb.shouldTryToCompress());
+               Assert.assertEquals(errorString + "Should Compresss", 
shouldCompress, ceb.shouldTryToCompress());
        }
 
        private static WTreeRoot getWorkloadTree(DMLProgram prog) {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
 
b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
index 9ffeb02..5de8880 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
@@ -73,7 +73,7 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase {
 
        @Test
        public void testMLogRegCP() {
-               runWorkloadAnalysisTest(TEST_NAME1, ExecMode.HYBRID, 2, false);
+               runWorkloadAnalysisTest(TEST_NAME1, ExecMode.HYBRID, 1, false);
        }
 
        @Test

Reply via email to