[SYSTEMML-811] Compiler integration compressed linear algebra Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/cbc4509b Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/cbc4509b Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/cbc4509b
Branch: refs/heads/master Commit: cbc4509ba48b3843b10dbc532649c42aa1e302d8 Parents: 616793d Author: Matthias Boehm <[email protected]> Authored: Sat Jul 16 20:10:00 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sat Jul 16 20:10:00 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/conf/DMLConfig.java | 2 + src/main/java/org/apache/sysml/hops/Hop.java | 64 +++++++++++++- .../sysml/hops/rewrite/ProgramRewriter.java | 1 + .../hops/rewrite/RewriteCompressedReblock.java | 87 ++++++++++++++++++++ .../java/org/apache/sysml/lops/Compression.java | 81 ++++++++++++++++++ 5 files changed, 231 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/conf/DMLConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/conf/DMLConfig.java b/src/main/java/org/apache/sysml/conf/DMLConfig.java index 15ec73e..b87b476 100644 --- a/src/main/java/org/apache/sysml/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysml/conf/DMLConfig.java @@ -70,6 +70,7 @@ public class DMLConfig public static final String YARN_APPQUEUE = "dml.yarn.app.queue"; public static final String CP_PARALLEL_MATRIXMULT = "cp.parallel.matrixmult"; public static final String CP_PARALLEL_TEXTIO = "cp.parallel.textio"; + public static final String COMPRESSED_LINALG = "compressed.linalg"; // supported prefixes for custom map/reduce configurations public static final String PREFIX_MAPRED = "mapred"; @@ -100,6 +101,7 @@ public class DMLConfig _defaultVals.put(YARN_APPQUEUE, "default" ); _defaultVals.put(CP_PARALLEL_MATRIXMULT, "true" ); _defaultVals.put(CP_PARALLEL_TEXTIO, "true" ); + _defaultVals.put(COMPRESSED_LINALG, "false" ); } public DMLConfig() http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/Hop.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java index 144ca20..7d69940 100644 --- a/src/main/java/org/apache/sysml/hops/Hop.java +++ b/src/main/java/org/apache/sysml/hops/Hop.java @@ -29,6 +29,7 @@ import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.lops.CSVReBlock; import org.apache.sysml.lops.Checkpoint; +import org.apache.sysml.lops.Compression; import org.apache.sysml.lops.Data; import org.apache.sysml.lops.Lop; import org.apache.sysml.lops.LopsException; @@ -108,6 +109,10 @@ public abstract class Hop // (usually this happens on persistent reads dataops) protected boolean _requiresReblock = false; + // indicates if the output of this hop needs to be compressed + // (this happens on persistent reads after reblock but before checkpoint) + protected boolean _requiresCompression = false; + // indicates if the output of this hop needs to be checkpointed (cached) // (the default storage level for caching is not yet exposed here) protected boolean _requiresCheckpoint = false; @@ -234,11 +239,18 @@ public abstract class Hop } } - public void setRequiresReblock(boolean flag) - { + public void setRequiresReblock(boolean flag) { _requiresReblock = flag; } + public void setRequiresCompression(boolean flag) { + _requiresCompression = flag; + } + + public boolean requiresCompression() { + return _requiresCompression; + } + public boolean hasMatrixInputWithDifferentBlocksizes() { for( Hop c : getInput() ) { @@ -285,7 +297,10 @@ public abstract class Hop //Step 1: construct reblock lop if required (output of hop) constructAndSetReblockLopIfRequired(); - //Step 2: construct checkpoint lop if required (output of hop or reblock) + //Step 2: construct compression lop if required + constructAndSetCompressionLopIfRequired(); + + //Step 3: construct checkpoint lop if required (output of hop or reblock) constructAndSetCheckpointLopIfRequired(); } @@ -397,8 +412,49 @@ public abstract class Hop catch( LopsException ex ) { throw new HopsException(ex); } + } + } + + /** + * + * @throws HopsException + */ + private void constructAndSetCompressionLopIfRequired() + throws HopsException + { + //determine execution type + ExecType et = ExecType.CP; + if( OptimizerUtils.isSparkExecutionMode() + && getDataType()!=DataType.SCALAR ) + { + //conditional checkpoint based on memory estimate in order to avoid unnecessary + //persist and unpersist calls (4x the memory budget is conservative) + if( OptimizerUtils.isHybridExecutionMode() + && 2*_outputMemEstimate < OptimizerUtils.getLocalMemBudget() + || _etypeForced == ExecType.CP ) + { + et = ExecType.CP; + } + else //default case + { + et = ExecType.SPARK; + } + } + + //add reblock lop to output if required + if( _requiresCompression ) + { + try + { + Lop compress = new Compression(getLops(), getDataType(), getValueType(), et); + setOutputDimensions( compress ); + setLineNumbers( compress ); + setLops( compress ); + } + catch( LopsException ex ) { + throw new HopsException(ex); + } } - } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java index 49aa0db..8e645dc 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java @@ -92,6 +92,7 @@ public class ProgramRewriter _dagRuleSet.add( new RewriteTransientWriteParentHandling() ); _dagRuleSet.add( new RewriteRemoveReadAfterWrite() ); //dependency: before blocksize _dagRuleSet.add( new RewriteBlockSizeAndReblock() ); + _dagRuleSet.add( new RewriteCompressedReblock() ); _dagRuleSet.add( new RewriteRemoveUnnecessaryCasts() ); if( OptimizerUtils.ALLOW_COMMON_SUBEXPRESSION_ELIMINATION ) _dagRuleSet.add( new RewriteCommonSubexpressionElimination() ); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java new file mode 100644 index 0000000..3faf4bc --- /dev/null +++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.hops.rewrite; + +import java.util.ArrayList; + +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.conf.DMLConfig; +import org.apache.sysml.hops.DataOp; +import org.apache.sysml.hops.Hop; +import org.apache.sysml.hops.HopsException; +import org.apache.sysml.hops.Hop.DataOpTypes; + +/** + * Rule: CompressedReblock: If config compressed.linalg is enabled, we + * inject compression hooks after pread of matrices w/ both dims > 1. + */ +public class RewriteCompressedReblock extends HopRewriteRule +{ + + @Override + public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, ProgramRewriteStatus state) + throws HopsException + { + if( roots == null ) + return null; + + boolean compress = ConfigurationManager.getDMLConfig() + .getBooleanValue(DMLConfig.COMPRESSED_LINALG); + + //perform compressed reblock rewrite + if( compress ) + for( Hop h : roots ) + rule_CompressedReblock(h); + + return roots; + } + + @Override + public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state) + throws HopsException + { + //do nothing (ppred will never occur in predicate) + return root; + } + + /** + * + * @param hop + * @throws HopsException + */ + private void rule_CompressedReblock(Hop hop) + throws HopsException + { + // Go to the source(s) of the DAG + for (Hop hi : hop.getInput()) { + if (hi.getVisited() != Hop.VisitStatus.DONE) + rule_CompressedReblock(hi); + } + + if( hop instanceof DataOp + && ((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD + && hop.getDim1() > 1 && hop.getDim2() > 1 ) + { + hop.setRequiresCompression(true); + } + + hop.setVisited(Hop.VisitStatus.DONE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/lops/Compression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Compression.java b/src/main/java/org/apache/sysml/lops/Compression.java new file mode 100644 index 0000000..54dc445 --- /dev/null +++ b/src/main/java/org/apache/sysml/lops/Compression.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.lops; + +import org.apache.sysml.lops.LopProperties.ExecLocation; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.lops.compile.JobType; +import org.apache.sysml.parser.Expression.DataType; +import org.apache.sysml.parser.Expression.ValueType; + + +/** + * + */ +public class Compression extends Lop +{ + public static final String OPCODE = "compress"; + + /** + * + * @param input + * @param dt + * @param vt + * @param level + * @param et + * @throws LopsException + */ + public Compression(Lop input, DataType dt, ValueType vt, ExecType et) + throws LopsException + { + super(Lop.Type.Checkpoint, dt, vt); + this.addInput(input); + input.addOutput(this); + + boolean breaksAlignment = false; + boolean aligner = false; + boolean definesMRJob = false; + + lps.addCompatibility(JobType.INVALID); + lps.setProperties( inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob ); + } + + + @Override + public String toString() { + return "Compress"; + } + + @Override + public String getInstructions(String input1, String output) + throws LopsException + { + StringBuilder sb = new StringBuilder(); + sb.append( getExecType() ); + sb.append( Lop.OPERAND_DELIMITOR ); + sb.append( OPCODE ); + sb.append( OPERAND_DELIMITOR ); + sb.append( getInputs().get(0).prepInputOperand(input1)); + sb.append( OPERAND_DELIMITOR ); + sb.append( prepOutputOperand(output)); + + return sb.toString(); + } +} \ No newline at end of file
