[SYSTEMML-811] Compiler integration compressed linear algebra

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/cbc4509b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/cbc4509b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/cbc4509b

Branch: refs/heads/master
Commit: cbc4509ba48b3843b10dbc532649c42aa1e302d8
Parents: 616793d
Author: Matthias Boehm <[email protected]>
Authored: Sat Jul 16 20:10:00 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sat Jul 16 20:10:00 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/conf/DMLConfig.java   |  2 +
 src/main/java/org/apache/sysml/hops/Hop.java    | 64 +++++++++++++-
 .../sysml/hops/rewrite/ProgramRewriter.java     |  1 +
 .../hops/rewrite/RewriteCompressedReblock.java  | 87 ++++++++++++++++++++
 .../java/org/apache/sysml/lops/Compression.java | 81 ++++++++++++++++++
 5 files changed, 231 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/conf/DMLConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/conf/DMLConfig.java 
b/src/main/java/org/apache/sysml/conf/DMLConfig.java
index 15ec73e..b87b476 100644
--- a/src/main/java/org/apache/sysml/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysml/conf/DMLConfig.java
@@ -70,6 +70,7 @@ public class DMLConfig
        public static final String YARN_APPQUEUE        = "dml.yarn.app.queue"; 
        public static final String CP_PARALLEL_MATRIXMULT = 
"cp.parallel.matrixmult";
        public static final String CP_PARALLEL_TEXTIO   = "cp.parallel.textio";
+       public static final String COMPRESSED_LINALG    = "compressed.linalg";
 
        // supported prefixes for custom map/reduce configurations
        public static final String PREFIX_MAPRED = "mapred";
@@ -100,6 +101,7 @@ public class DMLConfig
                _defaultVals.put(YARN_APPQUEUE,              "default" );
                _defaultVals.put(CP_PARALLEL_MATRIXMULT, "true" );
                _defaultVals.put(CP_PARALLEL_TEXTIO,     "true" );
+               _defaultVals.put(COMPRESSED_LINALG,      "false" );
        }
        
        public DMLConfig()

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java 
b/src/main/java/org/apache/sysml/hops/Hop.java
index 144ca20..7d69940 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -29,6 +29,7 @@ import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.lops.CSVReBlock;
 import org.apache.sysml.lops.Checkpoint;
+import org.apache.sysml.lops.Compression;
 import org.apache.sysml.lops.Data;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.lops.LopsException;
@@ -108,6 +109,10 @@ public abstract class Hop
        // (usually this happens on persistent reads dataops)
        protected boolean _requiresReblock = false;
        
+       // indicates if the output of this hop needs to be compressed
+       // (this happens on persistent reads after reblock but before 
checkpoint)
+       protected boolean _requiresCompression = false;
+       
        // indicates if the output of this hop needs to be checkpointed (cached)
        // (the default storage level for caching is not yet exposed here)
        protected boolean _requiresCheckpoint = false;
@@ -234,11 +239,18 @@ public abstract class Hop
                }
        }
        
-       public void setRequiresReblock(boolean flag)
-       {
+       public void setRequiresReblock(boolean flag) {
                _requiresReblock = flag;
        }
        
+       public void setRequiresCompression(boolean flag) {
+               _requiresCompression = flag;
+       }
+       
+       public boolean requiresCompression() {
+               return _requiresCompression;
+       }
+       
        public boolean hasMatrixInputWithDifferentBlocksizes()
        {
                for( Hop c : getInput() ) {
@@ -285,7 +297,10 @@ public abstract class Hop
                //Step 1: construct reblock lop if required (output of hop)
                constructAndSetReblockLopIfRequired();
                
-               //Step 2: construct checkpoint lop if required (output of hop 
or reblock)
+               //Step 2: construct compression lop if required
+               constructAndSetCompressionLopIfRequired();
+               
+               //Step 3: construct checkpoint lop if required (output of hop 
or reblock)
                constructAndSetCheckpointLopIfRequired();
        }
        
@@ -397,8 +412,49 @@ public abstract class Hop
                        catch( LopsException ex ) {
                                throw new HopsException(ex);
                        }
+               }       
+       }
+       
+       /**
+        * 
+        * @throws HopsException
+        */
+       private void constructAndSetCompressionLopIfRequired() 
+               throws HopsException
+       {
+               //determine execution type
+               ExecType et = ExecType.CP;
+               if( OptimizerUtils.isSparkExecutionMode() 
+                       && getDataType()!=DataType.SCALAR )
+               {
+                       //conditional checkpoint based on memory estimate in 
order to avoid unnecessary 
+                       //persist and unpersist calls (4x the memory budget is 
conservative)
+                       if(    OptimizerUtils.isHybridExecutionMode() 
+                               && 2*_outputMemEstimate < 
OptimizerUtils.getLocalMemBudget()
+                               || _etypeForced == ExecType.CP )
+                       {
+                               et = ExecType.CP;
+                       }
+                       else //default case
+                       {
+                               et = ExecType.SPARK;
+                       }
+               }
+
+               //add reblock lop to output if required
+               if( _requiresCompression )
+               {
+                       try
+                       {
+                               Lop compress = new Compression(getLops(), 
getDataType(), getValueType(), et);                           
+                               setOutputDimensions( compress );
+                               setLineNumbers( compress );
+                               setLops( compress );
+                       }
+                       catch( LopsException ex ) {
+                               throw new HopsException(ex);
+                       }
                }
-               
        }
        
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java 
b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
index 49aa0db..8e645dc 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
@@ -92,6 +92,7 @@ public class ProgramRewriter
                        _dagRuleSet.add(     new 
RewriteTransientWriteParentHandling()       );
                        _dagRuleSet.add(     new RewriteRemoveReadAfterWrite()  
             ); //dependency: before blocksize
                        _dagRuleSet.add(     new RewriteBlockSizeAndReblock()   
             );
+                       _dagRuleSet.add(     new RewriteCompressedReblock()     
             );
                        _dagRuleSet.add(     new 
RewriteRemoveUnnecessaryCasts()             );         
                        if( 
OptimizerUtils.ALLOW_COMMON_SUBEXPRESSION_ELIMINATION )
                                _dagRuleSet.add( new 
RewriteCommonSubexpressionElimination()     );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
new file mode 100644
index 0000000..3faf4bc
--- /dev/null
+++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.hops.rewrite;
+
+import java.util.ArrayList;
+
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.hops.DataOp;
+import org.apache.sysml.hops.Hop;
+import org.apache.sysml.hops.HopsException;
+import org.apache.sysml.hops.Hop.DataOpTypes;
+
+/**
+ * Rule: CompressedReblock: If config compressed.linalg is enabled, we
+ * inject compression hooks after pread of matrices w/ both dims > 1.
+ */
+public class RewriteCompressedReblock extends HopRewriteRule
+{
+       
+       @Override
+       public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, 
ProgramRewriteStatus state)
+               throws HopsException
+       {
+               if( roots == null )
+                       return null;
+               
+               boolean compress = ConfigurationManager.getDMLConfig()
+                               .getBooleanValue(DMLConfig.COMPRESSED_LINALG);
+               
+               //perform compressed reblock rewrite
+               if( compress )
+                       for( Hop h : roots ) 
+                               rule_CompressedReblock(h);
+               
+               return roots;
+       }
+
+       @Override
+       public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state) 
+               throws HopsException
+       {
+               //do nothing (ppred will never occur in predicate)
+               return root;
+       }
+
+       /**
+        * 
+        * @param hop
+        * @throws HopsException
+        */
+       private void rule_CompressedReblock(Hop hop) 
+               throws HopsException 
+       {
+               // Go to the source(s) of the DAG
+               for (Hop hi : hop.getInput()) {
+                       if (hi.getVisited() != Hop.VisitStatus.DONE)
+                               rule_CompressedReblock(hi);
+               }
+
+               if( hop instanceof DataOp 
+                       && 
((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD
+                       && hop.getDim1() > 1 && hop.getDim2() > 1 ) 
+               {
+                       hop.setRequiresCompression(true);
+               }
+
+               hop.setVisited(Hop.VisitStatus.DONE);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/lops/Compression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Compression.java 
b/src/main/java/org/apache/sysml/lops/Compression.java
new file mode 100644
index 0000000..54dc445
--- /dev/null
+++ b/src/main/java/org/apache/sysml/lops/Compression.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.lops;
+
+import org.apache.sysml.lops.LopProperties.ExecLocation;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.lops.compile.JobType;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.Expression.ValueType;
+
+
+/**
+ * 
+ */
+public class Compression extends Lop 
+{
+       public static final String OPCODE = "compress"; 
+
+       /**
+        * 
+        * @param input
+        * @param dt
+        * @param vt
+        * @param level
+        * @param et
+        * @throws LopsException
+        */
+       public Compression(Lop input, DataType dt, ValueType vt, ExecType et) 
+               throws LopsException
+       {
+               super(Lop.Type.Checkpoint, dt, vt);             
+               this.addInput(input);
+               input.addOutput(this);
+               
+               boolean breaksAlignment = false;
+               boolean aligner = false;
+               boolean definesMRJob = false;
+               
+               lps.addCompatibility(JobType.INVALID);
+               lps.setProperties( inputs, et, ExecLocation.ControlProgram, 
breaksAlignment, aligner, definesMRJob );
+       }
+
+       
+       @Override
+       public String toString() {
+               return "Compress";
+       }
+       
+       @Override
+       public String getInstructions(String input1, String output) 
+               throws LopsException 
+       {
+               StringBuilder sb = new StringBuilder();
+               sb.append( getExecType() );
+               sb.append( Lop.OPERAND_DELIMITOR );
+               sb.append( OPCODE );
+               sb.append( OPERAND_DELIMITOR );
+               sb.append( getInputs().get(0).prepInputOperand(input1));
+               sb.append( OPERAND_DELIMITOR );
+               sb.append( prepOutputOperand(output));
+               
+               return sb.toString();
+       }
+}
\ No newline at end of file

Reply via email to