Repository: systemml
Updated Branches:
  refs/heads/master aefab8f8c -> d90073d80


[SYSTEMML-2031] Fix robustness compression w/ incompressible data

This patch improves the robustness of matrix compression by returning
the uncompressed block if all columns are incompressible or the
compression ratio is below 1. This avoids unnecessary size and
computation overhead for an unnecessary indirection to a single
uncompressed column group.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/78586a13
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/78586a13
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/78586a13

Branch: refs/heads/master
Commit: 78586a13114b9fed585060e2ab8976ba6f9b50bd
Parents: aefab8f
Author: Matthias Boehm <[email protected]>
Authored: Wed Nov 29 20:28:31 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Wed Nov 29 22:30:53 2017 -0800

----------------------------------------------------------------------
 .../runtime/compress/CompressedMatrixBlock.java | 24 ++++++++++--
 .../cp/CompressionCPInstruction.java            |  8 ++--
 .../spark/CompressionSPInstruction.java         | 41 +++++++-------------
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  3 +-
 4 files changed, 41 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/78586a13/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 58d008a..e1a6a7b 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -214,21 +214,23 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
         * +per column sparsity
         * 
         * @throws DMLRuntimeException if DMLRuntimeException occurs
+        * @return compressed matrix block or original block if incompressible
         */
-       public void compress() 
+       public MatrixBlock compress() 
                throws DMLRuntimeException
        {
                //default sequential execution
-               compress(1);
+               return compress(1);
        }
        
        /**
         * Compress block.
         * 
         * @param k  number of threads
+        * @return compressed matrix block or original block if incompressible
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void compress(int k) 
+       public MatrixBlock compress(int k) 
                throws DMLRuntimeException 
        {
                //check for redundant compression
@@ -304,7 +306,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        LOG.debug("Compression statistics:");
                        LOG.debug("--compression phase 1: "+_stats.timePhase1);
                }
-
+               
+               if( colsC.isEmpty() ) {
+                       if( LOG.isDebugEnabled() )
+                               LOG.debug("Abort block compression because all 
columns are incompressible.");
+                       return new MatrixBlock().copyShallow(this);
+               }
+               
                // PHASE 2: Grouping columns
                // Divide the bitmap columns into column groups.
                List<int[]> bitmapColGrps = 
PlanningCoCoder.findCocodesByPartitioning(
@@ -366,6 +374,12 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                _stats.size = estimateCompressedSizeInMemory();
                _stats.ratio= estimateSizeInMemory() / _stats.size;
                
+               if( _stats.ratio < 1 ) {
+                       if( LOG.isDebugEnabled() )
+                               LOG.debug("Abort block compression because 
compression ratio is less than 1.");
+                       return new MatrixBlock().copyShallow(this);
+               }
+               
                //final cleanup (discard uncompressed block)
                rawblock.cleanupBlock(true, true);
                this.cleanupBlock(true, true);
@@ -382,6 +396,8 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        LOG.debug("--compressed size: "+_stats.size);
                        LOG.debug("--compression ratio: "+_stats.ratio);
                }
+               
+               return this;
        }
 
        public CompressionStatistics getCompressionStatistics() {

http://git-wip-us.apache.org/repos/asf/systemml/blob/78586a13/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
index d9f6c32..630cdc2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
@@ -38,7 +38,7 @@ public class CompressionCPInstruction extends 
UnaryCPInstruction {
                throws DMLRuntimeException 
        {
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               String opcode = parts[0];               
+               String opcode = parts[0];
                CPOperand in1 = new CPOperand(parts[1]);
                CPOperand out = new CPOperand(parts[2]);
                
@@ -53,11 +53,11 @@ public class CompressionCPInstruction extends 
UnaryCPInstruction {
                MatrixBlock in = ec.getMatrixInput(input1.getName(), 
getExtendedOpcode());
                
                //compress the matrix block
-               CompressedMatrixBlock cmb = new CompressedMatrixBlock(in);
-               cmb.compress(OptimizerUtils.getConstrainedNumThreads(-1));
+               MatrixBlock out = new CompressedMatrixBlock(in)
+                       .compress(OptimizerUtils.getConstrainedNumThreads(-1));
                
                //set output and release input
                ec.releaseMatrixInput(input1.getName(), getExtendedOpcode());
-               ec.setMatrixOutput(output.getName(), cmb, getExtendedOpcode());
+               ec.setMatrixOutput(output.getName(), out, getExtendedOpcode());
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/78586a13/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java
index ac93266..cdf3f26 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java
@@ -35,53 +35,42 @@ public class CompressionSPInstruction extends 
UnarySPInstruction {
 
        private CompressionSPInstruction(Operator op, CPOperand in, CPOperand 
out, String opcode, String istr) {
                super(op, in, out, opcode, istr);
-               _sptype = SPINSTRUCTION_TYPE.Reorg;
+               _sptype = SPINSTRUCTION_TYPE.Compression;
        }
 
        public static CompressionSPInstruction parseInstruction ( String str ) 
-               throws DMLRuntimeException 
+               throws DMLRuntimeException
        {
                InstructionUtils.checkNumFields(str, 2);
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               
-               String opcode = parts[0];
-               CPOperand in = new CPOperand(parts[1]);
-               CPOperand out = new CPOperand(parts[2]);
-
-               return new CompressionSPInstruction(null, in, out, opcode, str);
+               return new CompressionSPInstruction(null,
+                       new CPOperand(parts[1]), new CPOperand(parts[2]), 
parts[0], str);
        }
        
        @Override
        public void processInstruction(ExecutionContext ec)
-                       throws DMLRuntimeException 
+               throws DMLRuntimeException 
        {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
-       
+               
                //get input rdd handle
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
-                               sec.getBinaryBlockRDDHandleForVariable( 
input1.getName() );
-       
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in =
+                       sec.getBinaryBlockRDDHandleForVariable( 
input1.getName() );
+               
                //execute compression
-               JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
-                               in.mapValues(new CompressionFunction());
-                       
+               JavaPairRDD<MatrixIndexes,MatrixBlock> out =
+                       in.mapValues(new CompressionFunction());
+               
                //set outputs
                sec.setRDDHandleForVariable(output.getName(), out);
                sec.addLineageRDD(input1.getName(), output.getName());
        }
 
-       public static class CompressionFunction implements 
Function<MatrixBlock,MatrixBlock> 
-       {       
+       public static class CompressionFunction implements 
Function<MatrixBlock,MatrixBlock> {
                private static final long serialVersionUID = 
-6528833083609423922L;
-
                @Override
-               public MatrixBlock call(MatrixBlock arg0) 
-                       throws Exception 
-               {
-                       CompressedMatrixBlock cmb = new 
CompressedMatrixBlock(arg0);
-                       cmb.compress();
-                       
-                       return cmb;
+               public MatrixBlock call(MatrixBlock arg0) throws Exception {
+                       return new CompressedMatrixBlock(arg0).compress();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/78586a13/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 22b20e4..c893bc6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -1349,7 +1349,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        copyDenseToDense(that);
        }
        
-       public void copyShallow(MatrixBlock that) {
+       public MatrixBlock copyShallow(MatrixBlock that) {
                rlen = that.rlen;
                clen = that.clen;
                nonZeros = that.nonZeros;
@@ -1358,6 +1358,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        denseBlock = that.denseBlock;
                else
                        sparseBlock = that.sparseBlock;
+               return this;
        }
        
        private void copySparseToSparse(MatrixBlock that)

Reply via email to