Repository: systemml Updated Branches: refs/heads/master aefab8f8c -> d90073d80
[SYSTEMML-2031] Fix robustness compression w/ incompressible data This patch improves the robustness of matrix compression by returning the uncompressed block if all columns are incompressible or the compression ratio is below 1. This avoids unnecessary size and computation overhead for an unnecessary indirection to a single uncompressed column group. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/78586a13 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/78586a13 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/78586a13 Branch: refs/heads/master Commit: 78586a13114b9fed585060e2ab8976ba6f9b50bd Parents: aefab8f Author: Matthias Boehm <[email protected]> Authored: Wed Nov 29 20:28:31 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Wed Nov 29 22:30:53 2017 -0800 ---------------------------------------------------------------------- .../runtime/compress/CompressedMatrixBlock.java | 24 ++++++++++-- .../cp/CompressionCPInstruction.java | 8 ++-- .../spark/CompressionSPInstruction.java | 41 +++++++------------- .../sysml/runtime/matrix/data/MatrixBlock.java | 3 +- 4 files changed, 41 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/78586a13/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index 58d008a..e1a6a7b 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -214,21 +214,23 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable * +per column sparsity * * @throws DMLRuntimeException if DMLRuntimeException occurs + * @return compressed matrix block or original block if incompressible */ - public void compress() + public MatrixBlock compress() throws DMLRuntimeException { //default sequential execution - compress(1); + return compress(1); } /** * Compress block. * * @param k number of threads + * @return compressed matrix block or original block if incompressible * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void compress(int k) + public MatrixBlock compress(int k) throws DMLRuntimeException { //check for redundant compression @@ -304,7 +306,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable LOG.debug("Compression statistics:"); LOG.debug("--compression phase 1: "+_stats.timePhase1); } - + + if( colsC.isEmpty() ) { + if( LOG.isDebugEnabled() ) + LOG.debug("Abort block compression because all columns are incompressible."); + return new MatrixBlock().copyShallow(this); + } + // PHASE 2: Grouping columns // Divide the bitmap columns into column groups. List<int[]> bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning( @@ -366,6 +374,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable _stats.size = estimateCompressedSizeInMemory(); _stats.ratio= estimateSizeInMemory() / _stats.size; + if( _stats.ratio < 1 ) { + if( LOG.isDebugEnabled() ) + LOG.debug("Abort block compression because compression ratio is less than 1."); + return new MatrixBlock().copyShallow(this); + } + //final cleanup (discard uncompressed block) rawblock.cleanupBlock(true, true); this.cleanupBlock(true, true); @@ -382,6 +396,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable LOG.debug("--compressed size: "+_stats.size); LOG.debug("--compression ratio: "+_stats.ratio); } + + return this; } public CompressionStatistics getCompressionStatistics() { http://git-wip-us.apache.org/repos/asf/systemml/blob/78586a13/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java index d9f6c32..630cdc2 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java @@ -38,7 +38,7 @@ public class CompressionCPInstruction extends UnaryCPInstruction { throws DMLRuntimeException { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - String opcode = parts[0]; + String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); CPOperand out = new CPOperand(parts[2]); @@ -53,11 +53,11 @@ public class CompressionCPInstruction extends UnaryCPInstruction { MatrixBlock in = ec.getMatrixInput(input1.getName(), getExtendedOpcode()); //compress the matrix block - CompressedMatrixBlock cmb = new CompressedMatrixBlock(in); - cmb.compress(OptimizerUtils.getConstrainedNumThreads(-1)); + MatrixBlock out = new CompressedMatrixBlock(in) + .compress(OptimizerUtils.getConstrainedNumThreads(-1)); //set output and release input ec.releaseMatrixInput(input1.getName(), getExtendedOpcode()); - ec.setMatrixOutput(output.getName(), cmb, getExtendedOpcode()); + ec.setMatrixOutput(output.getName(), out, getExtendedOpcode()); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/78586a13/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java index ac93266..cdf3f26 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java @@ -35,53 +35,42 @@ public class CompressionSPInstruction extends UnarySPInstruction { private CompressionSPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String istr) { super(op, in, out, opcode, istr); - _sptype = SPINSTRUCTION_TYPE.Reorg; + _sptype = SPINSTRUCTION_TYPE.Compression; } public static CompressionSPInstruction parseInstruction ( String str ) - throws DMLRuntimeException + throws DMLRuntimeException { InstructionUtils.checkNumFields(str, 2); String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - - String opcode = parts[0]; - CPOperand in = new CPOperand(parts[1]); - CPOperand out = new CPOperand(parts[2]); - - return new CompressionSPInstruction(null, in, out, opcode, str); + return new CompressionSPInstruction(null, + new CPOperand(parts[1]), new CPOperand(parts[2]), parts[0], str); } @Override public void processInstruction(ExecutionContext ec) - throws DMLRuntimeException + throws DMLRuntimeException { SparkExecutionContext sec = (SparkExecutionContext)ec; - + //get input rdd handle - JavaPairRDD<MatrixIndexes,MatrixBlock> in = - sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); - + JavaPairRDD<MatrixIndexes,MatrixBlock> in = + sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); + //execute compression - JavaPairRDD<MatrixIndexes,MatrixBlock> out = - in.mapValues(new CompressionFunction()); - + JavaPairRDD<MatrixIndexes,MatrixBlock> out = + in.mapValues(new CompressionFunction()); + //set outputs sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(input1.getName(), output.getName()); } - public static class CompressionFunction implements Function<MatrixBlock,MatrixBlock> - { + public static class CompressionFunction implements Function<MatrixBlock,MatrixBlock> { private static final long serialVersionUID = -6528833083609423922L; - @Override - public MatrixBlock call(MatrixBlock arg0) - throws Exception - { - CompressedMatrixBlock cmb = new CompressedMatrixBlock(arg0); - cmb.compress(); - - return cmb; + public MatrixBlock call(MatrixBlock arg0) throws Exception { + return new CompressedMatrixBlock(arg0).compress(); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/78586a13/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 22b20e4..c893bc6 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -1349,7 +1349,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab copyDenseToDense(that); } - public void copyShallow(MatrixBlock that) { + public MatrixBlock copyShallow(MatrixBlock that) { rlen = that.rlen; clen = that.clen; nonZeros = that.nonZeros; @@ -1358,6 +1358,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab denseBlock = that.denseBlock; else sparseBlock = that.sparseBlock; + return this; } private void copySparseToSparse(MatrixBlock that)
