This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new b84a2d1  [SYSTEMDS-2650] Re-computation from lineage with dedup
b84a2d1 is described below

commit b84a2d16e1796a6d68baf9e16b7ab87982c8e34d
Author: arnabp <arnab.ph...@tugraz.at>
AuthorDate: Wed Aug 19 20:45:36 2020 +0200

    [SYSTEMDS-2650] Re-computation from lineage with dedup
    
    This patch adds the below changes.
      - We now compile all the dedup patches into functions,
      - The main program places a function call for each dedupOp
        and calls the corresponding function,
      - Move the recomputation related code to a new class,
      - Add a new test class to match the recomputed results
        with the original outputs.
    
    Current code doesn't support multiple loops. Future commits
    will add optimizations to construct multi-return functions
    (instead of one per output variable), and compile sequence
    of equivalent function calls into loops.
---
 .../org/apache/sysds/parser/DataIdentifier.java    |   7 +
 .../sysds/runtime/lineage/LineageDedupUtils.java   |  34 ++
 .../apache/sysds/runtime/lineage/LineageItem.java  |   4 +
 .../sysds/runtime/lineage/LineageItemUtils.java    | 394 ----------------
 .../apache/sysds/runtime/lineage/LineageMap.java   |   7 +-
 .../sysds/runtime/lineage/LineageParser.java       |  17 +-
 .../runtime/lineage/LineageRecomputeUtils.java     | 522 +++++++++++++++++++++
 .../org/apache/sysds/test/AutomatedTestBase.java   |   4 +
 .../test/functions/lineage/LineageCodegenTest.java |   7 +-
 .../functions/lineage/LineageTraceBuiltinTest.java |   7 +-
 ...tinTest.java => LineageTraceDedupExecTest.java} |  47 +-
 .../lineage/LineageTraceExecSparkTest.java         |   6 +-
 .../functions/lineage/LineageTraceExecTest.java    |   7 +-
 .../lineage/LineageTraceFunctionTest.java          |   7 +-
 .../functions/lineage/LineageTraceParforTest.java  |   7 +-
 .../functions/lineage/LineageTraceDedupExec1.dml   |  33 ++
 .../functions/lineage/LineageTraceDedupExec10.dml  |  34 ++
 .../functions/lineage/LineageTraceDedupExec2.dml   |  45 ++
 18 files changed, 733 insertions(+), 456 deletions(-)

diff --git a/src/main/java/org/apache/sysds/parser/DataIdentifier.java 
b/src/main/java/org/apache/sysds/parser/DataIdentifier.java
index b58b0d9..22002d6 100644
--- a/src/main/java/org/apache/sysds/parser/DataIdentifier.java
+++ b/src/main/java/org/apache/sysds/parser/DataIdentifier.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.parser;
 
 import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.ValueType;
 
 public class DataIdentifier extends Identifier
 {
@@ -44,6 +45,12 @@ public class DataIdentifier extends Identifier
                this(name);
                _dataType = dt;
        }
+
+       public DataIdentifier(String name, DataType dt, ValueType vt){
+               this(name);
+               _dataType = dt;
+               _valueType = vt;
+       }
        
        public DataIdentifier(){
                _name = null;
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
index 6acc248..230e5e1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.runtime.lineage;
 
 import java.util.ArrayList;
+import java.util.Map;
 
 import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
 import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
@@ -28,8 +29,10 @@ import 
org.apache.sysds.runtime.controlprogram.IfProgramBlock;
 import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.utils.Explain;
 
 public class LineageDedupUtils {
+       public static final String DEDUP_DELIM = "_";
        private static Lineage _tmpLineage = null;
        private static Lineage _mainLineage = null;
        private static ArrayList<Long> _numDistinctPaths = new ArrayList<>();
@@ -131,7 +134,38 @@ public class LineageDedupUtils {
                _tmpLineage.clearLineageMap();
                _tmpLineage.clearDedupBlock();
        }
+
+       public static String mergeExplainDedupBlocks(ExecutionContext ec) {
+               Map<ProgramBlock, LineageDedupBlock> dedupBlocks = 
ec.getLineage().getDedupBlocks();
+               StringBuilder sb = new StringBuilder();
+               // Gather all the DAG roots of all the paths in all the loops.
+               for (Map.Entry<ProgramBlock, LineageDedupBlock> dblock : 
dedupBlocks.entrySet()) {
+                       if (dblock.getValue() != null) {
+                               String forKey = 
dblock.getKey().getStatementBlock().getName();
+                               LineageDedupBlock dedup = dblock.getValue();
+                               for (Map.Entry<Long, LineageMap> patch : 
dedup.getPathMaps().entrySet()) {
+                                       for (Map.Entry<String, LineageItem> 
root : patch.getValue().getTraces().entrySet()) {
+                                               // Encode all the information 
in the headers that're
+                                               // needed by the 
deserialization logic.
+                                               sb.append("patch");
+                                               sb.append(DEDUP_DELIM);
+                                               sb.append(root.getKey());
+                                               sb.append(DEDUP_DELIM);
+                                               sb.append(forKey);
+                                               sb.append(DEDUP_DELIM);
+                                               sb.append(patch.getKey());
+                                               sb.append("\n");
+                                               
sb.append(Explain.explain(root.getValue()));
+                                               sb.append("\n");
+                                               
+                                       }
+                               }
+                       }
+               }
+               return sb.toString();
+       }
        
+       
//------------------------------------------------------------------------------
        /* The below static functions help to compute the number of distinct 
paths
         * in any program block, and are used for diagnostic purposes. These 
will
         * be removed in future.
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
index 2a9891a..88f2fb8 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
@@ -195,6 +195,10 @@ public class LineageItem {
                return !_opcode.isEmpty();
        }
        
+       public boolean isDedup() {
+               return _opcode.startsWith(dedupItemOpcode);
+       }
+       
        /**
         * Non-recursive equivalent of {@link #resetVisitStatus()} 
         * for robustness with regard to stack overflow errors.
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
index 9a96828..f40582d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysds.runtime.lineage;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -30,23 +29,10 @@ import 
org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.lineage.LineageItem.LineageItemType;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.AggOp;
-import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.Direction;
-import org.apache.sysds.common.Types.OpOp1;
-import org.apache.sysds.common.Types.OpOp2;
-import org.apache.sysds.common.Types.OpOp3;
-import org.apache.sysds.common.Types.OpOpDG;
-import org.apache.sysds.common.Types.OpOpData;
-import org.apache.sysds.common.Types.OpOpN;
-import org.apache.sysds.common.Types.ParamBuiltinOp;
-import org.apache.sysds.common.Types.ReOrgOp;
-import org.apache.sysds.common.Types.ValueType;
-import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.AggBinaryOp;
 import org.apache.sysds.hops.AggUnaryOp;
 import org.apache.sysds.hops.BinaryOp;
-import org.apache.sysds.hops.DataGenOp;
-import org.apache.sysds.hops.DataOp;
 import org.apache.sysds.hops.Hop;
 import org.apache.sysds.hops.IndexingOp;
 import org.apache.sysds.hops.LiteralOp;
@@ -54,41 +40,26 @@ import org.apache.sysds.hops.ReorgOp;
 import org.apache.sysds.hops.TernaryOp;
 import org.apache.sysds.hops.UnaryOp;
 import org.apache.sysds.hops.codegen.SpoofFusedOp;
-import org.apache.sysds.hops.rewrite.HopRewriteUtils;
-import org.apache.sysds.lops.Lop;
 import org.apache.sysds.lops.PartialAggregate;
 import org.apache.sysds.lops.UnaryCP;
 import org.apache.sysds.lops.compile.Dag;
-import org.apache.sysds.parser.DataExpression;
-import org.apache.sysds.parser.DataIdentifier;
-import org.apache.sysds.parser.Statement;
 import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
-import org.apache.sysds.runtime.controlprogram.Program;
-import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.InstructionParser;
-import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
-import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
-import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType;
-import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
 import org.apache.sysds.runtime.util.HDFSTool;
-import org.apache.sysds.utils.Explain;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -97,9 +68,7 @@ import java.util.stream.Collectors;
 
 public class LineageItemUtils {
        
-       private static final String LVARPREFIX = "lvar";
        public static final String LPLACEHOLDER = "IN#";
-       public static final String DEDUP_DELIM = "_";
        
        public static LineageItemType getType(String str) {
                if (str.length() == 1) {
@@ -159,284 +128,11 @@ public class LineageItemUtils {
                return sb.toString().trim();
        }
        
-       public static Data computeByLineage(LineageItem root) {
-               long rootId = root.getOpcode().equals("write") ?
-                       root.getInputs()[0].getId() : root.getId();
-               String varname = LVARPREFIX + rootId;
-               
-               //recursively construct hops 
-               root.resetVisitStatusNR();
-               Map<Long, Hop> operands = new HashMap<>();
-               rConstructHops(root, operands);
-               Hop out = HopRewriteUtils.createTransientWrite(
-                       varname, operands.get(rootId));
-               
-               //generate instructions for temporary hops
-               ExecutionContext ec = ExecutionContextFactory.createContext();
-               BasicProgramBlock pb = new BasicProgramBlock(new Program());
-               Dag<Lop> dag = new Dag<>();
-               Lop lops = out.constructLops();
-               lops.addToDag(dag);
-               pb.setInstructions(dag.getJobs(null,
-                       ConfigurationManager.getDMLConfig()));
-               
-               // reset cache due to cleaned data objects
-               LineageCache.resetCache();
-               //execute instructions and get result
-               pb.execute(ec);
-               return ec.getVariable(varname);
-       }
-       
        public static LineageItem[] getLineage(ExecutionContext ec, 
CPOperand... operands) {
                return Arrays.stream(operands).filter(c -> c!=null)
                        .map(c -> 
ec.getLineage().getOrCreate(c)).toArray(LineageItem[]::new);
        }
        
-       private static void rConstructHops(LineageItem item, Map<Long, Hop> 
operands) {
-               if (item.isVisited())
-                       return;
-               
-               //recursively process children (ordering by data dependencies)
-               if (!item.isLeaf())
-                       for (LineageItem c : item.getInputs())
-                               rConstructHops(c, operands);
-               
-               //process current lineage item
-               //NOTE: we generate instructions from hops (but without 
rewrites) to automatically
-               //handle execution types, rmvar instructions, and rewiring of 
inputs/outputs
-               switch (item.getType()) {
-                       case Creation: {
-                               Instruction inst = 
InstructionParser.parseSingleInstruction(item.getData());
-                               
-                               if (inst instanceof DataGenCPInstruction) {
-                                       DataGenCPInstruction rand = 
(DataGenCPInstruction) inst;
-                                       HashMap<String, Hop> params = new 
HashMap<>();
-                                       if( rand.getOpcode().equals("rand") ) {
-                                               if( rand.output.getDataType() 
== DataType.TENSOR)
-                                                       
params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims()));
-                                               else {
-                                                       
params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows()));
-                                                       
params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols()));
-                                               }
-                                               
params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue()));
-                                               
params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue()));
-                                               
params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf()));
-                                               
params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams()));
-                                               
params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity()));
-                                               
params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed()));
-                                       }
-                                       else if( rand.getOpcode().equals("seq") 
) {
-                                               params.put(Statement.SEQ_FROM, 
new LiteralOp(rand.getFrom()));
-                                               params.put(Statement.SEQ_TO, 
new LiteralOp(rand.getTo()));
-                                               params.put(Statement.SEQ_INCR, 
new LiteralOp(rand.getIncr()));
-                                       }
-                                       Hop datagen = new 
DataGenOp(OpOpDG.valueOf(rand.getOpcode().toUpperCase()),
-                                               new DataIdentifier("tmp"), 
params);
-                                       
datagen.setBlocksize(rand.getBlocksize());
-                                       operands.put(item.getId(), datagen);
-                               } else if (inst instanceof VariableCPInstruction
-                                               && ((VariableCPInstruction) 
inst).isCreateVariable()) {
-                                       String parts[] = 
InstructionUtils.getInstructionPartsWithValueType(inst.toString());
-                                       DataType dt = 
DataType.valueOf(parts[4]);
-                                       ValueType vt = dt == DataType.MATRIX ? 
ValueType.FP64 : ValueType.STRING;
-                                       HashMap<String, Hop> params = new 
HashMap<>();
-                                       params.put(DataExpression.IO_FILENAME, 
new LiteralOp(parts[2]));
-                                       params.put(DataExpression.READROWPARAM, 
new LiteralOp(Long.parseLong(parts[6])));
-                                       params.put(DataExpression.READCOLPARAM, 
new LiteralOp(Long.parseLong(parts[7])));
-                                       params.put(DataExpression.READNNZPARAM, 
new LiteralOp(Long.parseLong(parts[8])));
-                                       params.put(DataExpression.FORMAT_TYPE, 
new LiteralOp(parts[5]));
-                                       DataOp pread = new 
DataOp(parts[1].substring(5), dt, vt, OpOpData.PERSISTENTREAD, params);
-                                       pread.setFileName(parts[2]);
-                                       operands.put(item.getId(), pread);
-                               }
-                               else if  (inst instanceof RandSPInstruction) {
-                                       RandSPInstruction rand = 
(RandSPInstruction) inst;
-                                       HashMap<String, Hop> params = new 
HashMap<>();
-                                       if (rand.output.getDataType() == 
DataType.TENSOR)
-                                               
params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims()));
-                                       else {
-                                               
params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows()));
-                                               
params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols()));
-                                       }
-                                       params.put(DataExpression.RAND_MIN, new 
LiteralOp(rand.getMinValue()));
-                                       params.put(DataExpression.RAND_MAX, new 
LiteralOp(rand.getMaxValue()));
-                                       params.put(DataExpression.RAND_PDF, new 
LiteralOp(rand.getPdf()));
-                                       params.put(DataExpression.RAND_LAMBDA, 
new LiteralOp(rand.getPdfParams()));
-                                       
params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity()));
-                                       params.put(DataExpression.RAND_SEED, 
new LiteralOp(rand.getSeed()));
-                                       Hop datagen = new 
DataGenOp(OpOpDG.RAND, new DataIdentifier("tmp"), params);
-                                       
datagen.setBlocksize(rand.getBlocksize());
-                                       operands.put(item.getId(), datagen);
-                               }
-                               break;
-                       }
-                       case Instruction: {
-                               CPType ctype = 
InstructionUtils.getCPTypeByOpcode(item.getOpcode());
-                               SPType stype = 
InstructionUtils.getSPTypeByOpcode(item.getOpcode());
-                               
-                               if (ctype != null) {
-                                       switch (ctype) {
-                                               case AggregateUnary: {
-                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
-                                                       Hop aggunary = 
InstructionUtils.isUnaryMetadata(item.getOpcode()) ?
-                                                               
HopRewriteUtils.createUnary(input, OpOp1.valueOfByOpcode(item.getOpcode())) :
-                                                               
HopRewriteUtils.createAggUnaryOp(input, item.getOpcode());
-                                                       
operands.put(item.getId(), aggunary);
-                                                       break;
-                                               }
-                                               case AggregateBinary: {
-                                                       Hop input1 = 
operands.get(item.getInputs()[0].getId());
-                                                       Hop input2 = 
operands.get(item.getInputs()[1].getId());
-                                                       Hop aggbinary = 
HopRewriteUtils.createMatrixMultiply(input1, input2);
-                                                       
operands.put(item.getId(), aggbinary);
-                                                       break;
-                                               }
-                                               case AggregateTernary: {
-                                                       Hop input1 = 
operands.get(item.getInputs()[0].getId());
-                                                       Hop input2 = 
operands.get(item.getInputs()[1].getId());
-                                                       Hop input3 = 
operands.get(item.getInputs()[2].getId());
-                                                       Hop aggternary = 
HopRewriteUtils.createSum(
-                                                               
HopRewriteUtils.createBinary(
-                                                               
HopRewriteUtils.createBinary(input1, input2, OpOp2.MULT),
-                                                               input3, 
OpOp2.MULT));
-                                                       
operands.put(item.getId(), aggternary);
-                                                       break;
-                                               }
-                                               case Unary:
-                                               case Builtin: {
-                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
-                                                       Hop unary = 
HopRewriteUtils.createUnary(input, item.getOpcode());
-                                                       
operands.put(item.getId(), unary);
-                                                       break;
-                                               }
-                                               case Reorg: {
-                                                       
operands.put(item.getId(), HopRewriteUtils.createReorg(
-                                                               
operands.get(item.getInputs()[0].getId()), item.getOpcode()));
-                                                       break;
-                                               }
-                                               case Reshape: {
-                                                       ArrayList<Hop> inputs = 
new ArrayList<>();
-                                                       for(int i=0; i<5; i++)
-                                                               
inputs.add(operands.get(item.getInputs()[i].getId()));
-                                                       
operands.put(item.getId(), HopRewriteUtils.createReorg(inputs, 
ReOrgOp.RESHAPE));
-                                                       break;
-                                               }
-                                               case Binary: {
-                                                       //handle special cases 
of binary operations 
-                                                       String opcode = 
("^2".equals(item.getOpcode()) 
-                                                               || 
"*2".equals(item.getOpcode())) ? 
-                                                               
item.getOpcode().substring(0, 1) : item.getOpcode();
-                                                       Hop input1 = 
operands.get(item.getInputs()[0].getId());
-                                                       Hop input2 = 
operands.get(item.getInputs()[1].getId());
-                                                       Hop binary = 
HopRewriteUtils.createBinary(input1, input2, opcode);
-                                                       
operands.put(item.getId(), binary);
-                                                       break;
-                                               }
-                                               case Ternary: {
-                                                       
operands.put(item.getId(), HopRewriteUtils.createTernary(
-                                                               
operands.get(item.getInputs()[0].getId()), 
-                                                               
operands.get(item.getInputs()[1].getId()), 
-                                                               
operands.get(item.getInputs()[2].getId()), item.getOpcode()));
-                                                       break;
-                                               }
-                                               case Ctable: { //e.g., ctable 
-                                                       if( 
item.getInputs().length==3 )
-                                                               
operands.put(item.getId(), HopRewriteUtils.createTernary(
-                                                                       
operands.get(item.getInputs()[0].getId()),
-                                                                       
operands.get(item.getInputs()[1].getId()),
-                                                                       
operands.get(item.getInputs()[2].getId()), OpOp3.CTABLE));
-                                                       else if( 
item.getInputs().length==5 )
-                                                               
operands.put(item.getId(), HopRewriteUtils.createTernary(
-                                                                       
operands.get(item.getInputs()[0].getId()),
-                                                                       
operands.get(item.getInputs()[1].getId()),
-                                                                       
operands.get(item.getInputs()[2].getId()),
-                                                                       
operands.get(item.getInputs()[3].getId()),
-                                                                       
operands.get(item.getInputs()[4].getId()), OpOp3.CTABLE));
-                                                       break;
-                                               }
-                                               case BuiltinNary: {
-                                                       String opcode = 
item.getOpcode().equals("n+") ? "plus" : item.getOpcode();
-                                                       
operands.put(item.getId(), HopRewriteUtils.createNary(
-                                                               
OpOpN.valueOf(opcode.toUpperCase()), createNaryInputs(item, operands)));
-                                                       break;
-                                               }
-                                               case ParameterizedBuiltin: {
-                                                       
operands.put(item.getId(), constructParameterizedBuiltinOp(item, operands));
-                                                       break;
-                                               }
-                                               case MatrixIndexing: {
-                                                       
operands.put(item.getId(), constructIndexingOp(item, operands));
-                                                       break;
-                                               }
-                                               case MMTSJ: {
-                                                       //TODO handling of tsmm 
type left and right -> placement transpose
-                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
-                                                       Hop aggunary = 
HopRewriteUtils.createMatrixMultiply(
-                                                               
HopRewriteUtils.createTranspose(input), input);
-                                                       
operands.put(item.getId(), aggunary);
-                                                       break;
-                                               }
-                                               case Variable: {
-                                                       if( 
item.getOpcode().startsWith("cast") )
-                                                               
operands.put(item.getId(), HopRewriteUtils.createUnary(
-                                                                       
operands.get(item.getInputs()[0].getId()),
-                                                                       
OpOp1.valueOfByOpcode(item.getOpcode())));
-                                                       else //cpvar, write
-                                                               
operands.put(item.getId(), operands.get(item.getInputs()[0].getId()));
-                                                       break;
-                                               }
-                                               default:
-                                                       throw new 
DMLRuntimeException("Unsupported instruction "
-                                                               + "type: " + 
ctype.name() + " (" + item.getOpcode() + ").");
-                                       }
-                               }
-                               else if( stype != null ) {
-                                       switch(stype) {
-                                               case Reblock: {
-                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
-                                                       
input.setBlocksize(ConfigurationManager.getBlocksize());
-                                                       
input.setRequiresReblock(true);
-                                                       
operands.put(item.getId(), input);
-                                                       break;
-                                               }
-                                               case Checkpoint: {
-                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
-                                                       
operands.put(item.getId(), input);
-                                                       break;
-                                               }
-                                               case MatrixIndexing: {
-                                                       
operands.put(item.getId(), constructIndexingOp(item, operands));
-                                                       break;
-                                               }
-                                               case GAppend: {
-                                                       
operands.put(item.getId(), HopRewriteUtils.createBinary(
-                                                               
operands.get(item.getInputs()[0].getId()),
-                                                               
operands.get(item.getInputs()[1].getId()), OpOp2.CBIND));
-                                                       break;
-                                               }
-                                               default:
-                                                       throw new 
DMLRuntimeException("Unsupported instruction "
-                                                               + "type: " + 
stype.name() + " (" + item.getOpcode() + ").");
-                                       }
-                               }
-                               else
-                                       throw new 
DMLRuntimeException("Unsupported instruction: " + item.getOpcode());
-                               break;
-                       }
-                       case Literal: {
-                               CPOperand op = new CPOperand(item.getData());
-                               operands.put(item.getId(), ScalarObjectFactory
-                                       .createLiteralOp(op.getValueType(), 
op.getName()));
-                               break;
-                       }
-                       case Dedup: {
-                               throw new NotImplementedException();
-                       }
-               }
-               
-               item.setVisited();
-       }
-
        public static void constructLineageFromHops(Hop[] roots, String 
claName, Hop[] inputs, HashMap<Long, Hop> spoofmap) {
                //probe existence and only generate lineage if non-existing
                //(a fused operator might be used in multiple places of a 
program)
@@ -537,58 +233,6 @@ public class LineageItemUtils {
                root.setVisited();
        }
        
-       private static Hop constructIndexingOp(LineageItem item, Map<Long, Hop> 
operands) {
-               Hop input = operands.get(item.getInputs()[0].getId());
-               if( "rightIndex".equals(item.getOpcode()) )
-                       return HopRewriteUtils.createIndexingOp(input,
-                               operands.get(item.getInputs()[1].getId()), //rl
-                               operands.get(item.getInputs()[2].getId()), //ru
-                               operands.get(item.getInputs()[3].getId()), //cl
-                               operands.get(item.getInputs()[4].getId())); //cu
-               else if( "leftIndex".equals(item.getOpcode()) 
-                               || "mapLeftIndex".equals(item.getOpcode()) )
-                       return HopRewriteUtils.createLeftIndexingOp(input,
-                               operands.get(item.getInputs()[1].getId()), //rhs
-                               operands.get(item.getInputs()[2].getId()), //rl
-                               operands.get(item.getInputs()[3].getId()), //ru
-                               operands.get(item.getInputs()[4].getId()), //cl
-                               operands.get(item.getInputs()[5].getId())); //cu
-               throw new DMLRuntimeException("Unsupported opcode: 
"+item.getOpcode());
-       }
-       
-       private static Hop constructParameterizedBuiltinOp(LineageItem item, 
Map<Long, Hop> operands) {
-               String opcode = item.getOpcode();
-               Hop target = operands.get(item.getInputs()[0].getId());
-               LinkedHashMap<String,Hop> args = new LinkedHashMap<>();
-               if( opcode.equals("groupedagg") ) {
-                       args.put("target", target);
-                       args.put(Statement.GAGG_GROUPS, 
operands.get(item.getInputs()[1].getId()));
-                       args.put(Statement.GAGG_WEIGHTS, 
operands.get(item.getInputs()[2].getId()));
-                       args.put(Statement.GAGG_FN, 
operands.get(item.getInputs()[3].getId()));
-                       args.put(Statement.GAGG_NUM_GROUPS, 
operands.get(item.getInputs()[4].getId()));
-               }
-               else if (opcode.equalsIgnoreCase("rmempty")) {
-                       args.put("target", target);
-                       args.put("margin", 
operands.get(item.getInputs()[1].getId()));
-                       args.put("select", 
operands.get(item.getInputs()[2].getId()));
-               }
-               else if(opcode.equalsIgnoreCase("replace")) {
-                       args.put("target", target);
-                       args.put("pattern", 
operands.get(item.getInputs()[1].getId()));
-                       args.put("replacement", 
operands.get(item.getInputs()[2].getId()));
-               }
-               else if(opcode.equalsIgnoreCase("rexpand")) {
-                       args.put("target", target);
-                       args.put("max", 
operands.get(item.getInputs()[1].getId()));
-                       args.put("dir", 
operands.get(item.getInputs()[2].getId()));
-                       args.put("cast", 
operands.get(item.getInputs()[3].getId()));
-                       args.put("ignore", 
operands.get(item.getInputs()[4].getId()));
-               }
-               
-               return HopRewriteUtils.createParameterizedBuiltinOp(
-                       target, args, 
ParamBuiltinOp.valueOf(opcode.toUpperCase()));
-       }
-       
        public static LineageItem rDecompress(LineageItem item) {
                if (item.getType() == LineageItemType.Dedup) {
                        LineageItem dedupInput = 
rDecompress(item.getInputs()[0]);
@@ -654,36 +298,6 @@ public class LineageItemUtils {
                item.setVisited();
        }
        
-       public static String mergeExplainDedupBlocks(ExecutionContext ec) {
-               Map<ProgramBlock, LineageDedupBlock> dedupBlocks = 
ec.getLineage().getDedupBlocks();
-               StringBuilder sb = new StringBuilder();
-               // Gather all the DAG roots of all the paths in all the loops.
-               for (Map.Entry<ProgramBlock, LineageDedupBlock> dblock : 
dedupBlocks.entrySet()) {
-                       if (dblock.getValue() != null) {
-                               String forKey = 
dblock.getKey().getStatementBlock().getName();
-                               LineageDedupBlock dedup = dblock.getValue();
-                               for (Map.Entry<Long, LineageMap> patch : 
dedup.getPathMaps().entrySet()) {
-                                       for (Map.Entry<String, LineageItem> 
root : patch.getValue().getTraces().entrySet()) {
-                                               // Encode all the information 
in the headers that're
-                                               // needed by the 
deserialization logic.
-                                               sb.append("patch");
-                                               sb.append(DEDUP_DELIM);
-                                               sb.append(root.getKey());
-                                               sb.append(DEDUP_DELIM);
-                                               sb.append(forKey);
-                                               sb.append(DEDUP_DELIM);
-                                               sb.append(patch.getKey());
-                                               sb.append("\n");
-                                               
sb.append(Explain.explain(root.getValue()));
-                                               sb.append("\n");
-                                               
-                                       }
-                               }
-                       }
-               }
-               return sb.toString();
-       }
-       
        public static LineageItem replace(LineageItem root, LineageItem liOld, 
LineageItem liNew) {
                if( liNew == null )
                        throw new DMLRuntimeException("Invalid null lineage 
item for "+liOld.getId());
@@ -794,14 +408,6 @@ public class LineageItemUtils {
                current.setVisited();
        }
        
-       private static Hop[] createNaryInputs(LineageItem item, Map<Long, Hop> 
operands) {
-               int len = item.getInputs().length;
-               Hop[] ret = new Hop[len];
-               for( int i=0; i<len; i++ )
-                       ret[i] = operands.get(item.getInputs()[i].getId());
-               return ret;
-       }
-       
        public static boolean containsRandDataGen(HashSet<LineageItem> entries, 
LineageItem root) {
                if (entries.contains(root) || root.isVisited())
                        return false;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
index fdcadff..2fc63ad 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
@@ -83,7 +83,7 @@ public class LineageMap {
        }
 
        public void processDedupItem(LineageMap lm, Long path, LineageItem[] 
liinputs, String name) {
-               String delim = LineageItemUtils.DEDUP_DELIM;
+               String delim = LineageDedupUtils.DEDUP_DELIM;
                for (Map.Entry<String, LineageItem> entry : 
lm._traces.entrySet()) {
                        // Encode everything in the opcode needed by the 
deserialization logic
                        // to map this lineage item to the right patch.
@@ -250,10 +250,7 @@ public class LineageMap {
                
                if (DMLScript.LINEAGE_DEDUP) {
                        // gracefully serialize the dedup maps without 
decompressing
-                       
LineageItemUtils.writeTraceToHDFS(LineageItemUtils.mergeExplainDedupBlocks(ec), 
fName + ".lineage.dedup");
-                       // sample code to deserialize the dedup patches
-                       //String allDedup = 
LineageItemUtils.mergeExplainDedupBlocks(ec);
-                       //LineageItem tmp = 
LineageParser.parseLineageTraceDedup(allDedup);
+                       
LineageItemUtils.writeTraceToHDFS(LineageDedupUtils.mergeExplainDedupBlocks(ec),
 fName + ".lineage.dedup");
                }
                LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + 
".lineage");
        }
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
index c70cfdd..ad2f4e8 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
@@ -112,26 +112,23 @@ public class LineageParser
                return new LineageItem(id, "", opcode, inputs.toArray(new 
LineageItem[0]));
        }
        
-       public static LineageItem parseLineageTraceDedup(String str) {
-               LineageItem li = null;
-               Map<Long, Map<String, LineageItem>> patchLiMap = new 
HashMap<>();
+       protected static void parseLineageTraceDedup(String str) {
                str.replaceAll("\r\n", "\n");
                String[] allPatches = str.split("\n\n");
                for (String patch : allPatches) {
                        String[] headBody = patch.split("\r\n|\r|\n", 2);
-                       // Parse the header
-                       String[] parts = 
headBody[0].split(LineageItemUtils.DEDUP_DELIM);
-                       // e.g. patch_R_SB15_1
+                       // Parse the header (e.g. patch_R_SB15_1)
+                       String[] parts = 
headBody[0].split(LineageDedupUtils.DEDUP_DELIM);
                        // Deserialize the patch
                        LineageItem patchLi = parseLineageTrace(headBody[1]);
+                       //LineageItemUtils.computeByLineageDedup(patchLi);
                        Long pathId = Long.parseLong(parts[3]);
                        // Map the pathID and the DAG root name to the 
deserialized DAG.
-                       if (!patchLiMap.containsKey(pathId)) {
-                               patchLiMap.put(pathId, new HashMap<>());
+                       if 
(!LineageRecomputeUtils.patchLiMap.containsKey(pathId)) {
+                               LineageRecomputeUtils.patchLiMap.put(pathId, 
new HashMap<>());
                        }
-                       patchLiMap.get(pathId).put(parts[1], patchLi);
+                       
LineageRecomputeUtils.patchLiMap.get(pathId).put(parts[1], patchLi);
                        // TODO: handle multiple loops
                }
-               return li;
        }
 }
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
new file mode 100644
index 0000000..8f2c226
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.lineage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.OpOp1;
+import org.apache.sysds.common.Types.OpOp2;
+import org.apache.sysds.common.Types.OpOp3;
+import org.apache.sysds.common.Types.OpOpDG;
+import org.apache.sysds.common.Types.OpOpData;
+import org.apache.sysds.common.Types.OpOpN;
+import org.apache.sysds.common.Types.ParamBuiltinOp;
+import org.apache.sysds.common.Types.ReOrgOp;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.DataGenOp;
+import org.apache.sysds.hops.DataOp;
+import org.apache.sysds.hops.FunctionOp;
+import org.apache.sysds.hops.FunctionOp.FunctionType;
+import org.apache.sysds.hops.Hop;
+import org.apache.sysds.hops.LiteralOp;
+import org.apache.sysds.hops.rewrite.HopRewriteUtils;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.lops.compile.Dag;
+import org.apache.sysds.parser.DMLProgram;
+import org.apache.sysds.parser.DataExpression;
+import org.apache.sysds.parser.DataIdentifier;
+import org.apache.sysds.parser.Statement;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
+import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
+import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.InstructionParser;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction;
+import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
+import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysds.runtime.instructions.spark.RandSPInstruction;
+import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType;
+import org.apache.sysds.utils.Explain;
+import org.apache.sysds.utils.Explain.ExplainCounts;
+import org.apache.sysds.utils.Statistics;
+
+public class LineageRecomputeUtils {
+       private static final String LVARPREFIX = "lvar";
+       public static final String LPLACEHOLDER = "IN#";
+       private static final boolean DEBUG = false;
+       public static final Map<Long, Map<String, LineageItem>> patchLiMap = 
new HashMap<>();
+       private static final Map<Long, Map<String, Hop>> patchHopMap = new 
HashMap<>();
+
+       public static Data parseNComputeLineageTrace(String mainTrace, String 
dedupPatches) {
+               LineageItem root = LineageParser.parseLineageTrace(mainTrace);
+               if (dedupPatches != null)
+                       LineageParser.parseLineageTraceDedup(dedupPatches);
+               Data ret = computeByLineage(root);
+               // Cleanup the statics
+               patchLiMap.clear();
+               patchHopMap.clear();
+               return ret;
+       }
+       
+       private static Data computeByLineage(LineageItem root) 
+       {
+               long rootId = root.getOpcode().equals("write") ?
+                       root.getInputs()[0].getId() : root.getId();
+               String varname = LVARPREFIX + rootId;
+               Program prog = new Program(null);
+               
+               // Recursively construct hops 
+               root.resetVisitStatusNR();
+               Map<Long, Hop> operands = new HashMap<>();
+               Map<String, Hop> partDagRoots = new HashMap<>();
+               rConstructHops(root, operands, partDagRoots, prog);
+               Hop out = HopRewriteUtils.createTransientWrite(
+                       varname, operands.get(rootId));
+               
+               // Generate instructions
+               ExecutionContext ec = ExecutionContextFactory.createContext();
+               partDagRoots.put(varname, out);
+               constructBasicBlock(partDagRoots, varname, prog);
+               
+               // Reset cache due to cleaned data objects
+               LineageCache.resetCache();
+               //execute instructions and get result
+               if (DEBUG) {
+                       DMLScript.STATISTICS = true;
+                       ExplainCounts counts = 
Explain.countDistributedOperations(prog);
+                       System.out.println(Explain.display(null, prog, 
Explain.ExplainType.RUNTIME, counts));
+               }
+               ec.setProgram(prog);
+               prog.execute(ec);
+               if (DEBUG) {
+                       Statistics.stopRunTimer();
+                       
System.out.println(Statistics.display(DMLScript.STATISTICS_COUNT));
+               }
+               return ec.getVariable(varname);
+       }
+       
+       private static void constructBasicBlock(Map<String, Hop> partDagRoots, 
String dedupOut, Program prog) {
+               Hop out = partDagRoots.get(dedupOut);
+               // Compile and save
+               BasicProgramBlock pb = new BasicProgramBlock(prog);
+               pb.setInstructions(genInst(out));
+               prog.addProgramBlock(pb);
+       }
+       
+       
+       private static void rConstructHops(LineageItem item, Map<Long, Hop> 
operands, Map<String, Hop> partDagRoots, Program prog) 
+       {
+               if (item.isVisited())
+                       return;
+               
+               //recursively process children (ordering by data dependencies)
+               if (!item.isLeaf())
+                       for (LineageItem c : item.getInputs())
+                               rConstructHops(c, operands, partDagRoots, prog);
+               
+               //process current lineage item
+               //NOTE: we generate instructions from hops (but without 
rewrites) to automatically
+               //handle execution types, rmvar instructions, and rewiring of 
inputs/outputs
+               switch (item.getType()) {
+                       case Creation: {
+                               if (item.getData().startsWith(LPLACEHOLDER)) {
+                                       long phId = 
Long.parseLong(item.getData().substring(3));
+                                       Hop input = operands.get(phId);
+                                       operands.remove(phId);
+                                       // Replace the placeholders with TReads
+                                       operands.put(item.getId(), input); // 
order preserving
+                                       break;
+                               }
+                               Instruction inst = 
InstructionParser.parseSingleInstruction(item.getData());
+                               
+                               if (inst instanceof DataGenCPInstruction) {
+                                       DataGenCPInstruction rand = 
(DataGenCPInstruction) inst;
+                                       HashMap<String, Hop> params = new 
HashMap<>();
+                                       if( rand.getOpcode().equals("rand") ) {
+                                               if( rand.output.getDataType() 
== DataType.TENSOR)
+                                                       
params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims()));
+                                               else {
+                                                       
params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows()));
+                                                       
params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols()));
+                                               }
+                                               
params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue()));
+                                               
params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue()));
+                                               
params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf()));
+                                               
params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams()));
+                                               
params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity()));
+                                               
params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed()));
+                                       }
+                                       else if( rand.getOpcode().equals("seq") 
) {
+                                               params.put(Statement.SEQ_FROM, 
new LiteralOp(rand.getFrom()));
+                                               params.put(Statement.SEQ_TO, 
new LiteralOp(rand.getTo()));
+                                               params.put(Statement.SEQ_INCR, 
new LiteralOp(rand.getIncr()));
+                                       }
+                                       Hop datagen = new 
DataGenOp(OpOpDG.valueOf(rand.getOpcode().toUpperCase()),
+                                               new DataIdentifier("tmp"), 
params);
+                                       
datagen.setBlocksize(rand.getBlocksize());
+                                       operands.put(item.getId(), datagen);
+                               } else if (inst instanceof VariableCPInstruction
+                                               && ((VariableCPInstruction) 
inst).isCreateVariable()) {
+                                       String parts[] = 
InstructionUtils.getInstructionPartsWithValueType(inst.toString());
+                                       DataType dt = 
DataType.valueOf(parts[4]);
+                                       ValueType vt = dt == DataType.MATRIX ? 
ValueType.FP64 : ValueType.STRING;
+                                       HashMap<String, Hop> params = new 
HashMap<>();
+                                       params.put(DataExpression.IO_FILENAME, 
new LiteralOp(parts[2]));
+                                       params.put(DataExpression.READROWPARAM, 
new LiteralOp(Long.parseLong(parts[6])));
+                                       params.put(DataExpression.READCOLPARAM, 
new LiteralOp(Long.parseLong(parts[7])));
+                                       params.put(DataExpression.READNNZPARAM, 
new LiteralOp(Long.parseLong(parts[8])));
+                                       params.put(DataExpression.FORMAT_TYPE, 
new LiteralOp(parts[5]));
+                                       DataOp pread = new 
DataOp(parts[1].substring(5), dt, vt, OpOpData.PERSISTENTREAD, params);
+                                       pread.setFileName(parts[2]);
+                                       operands.put(item.getId(), pread);
+                               }
+                               else if  (inst instanceof RandSPInstruction) {
+                                       RandSPInstruction rand = 
(RandSPInstruction) inst;
+                                       HashMap<String, Hop> params = new 
HashMap<>();
+                                       if (rand.output.getDataType() == 
DataType.TENSOR)
+                                               
params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims()));
+                                       else {
+                                               
params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows()));
+                                               
params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols()));
+                                       }
+                                       params.put(DataExpression.RAND_MIN, new 
LiteralOp(rand.getMinValue()));
+                                       params.put(DataExpression.RAND_MAX, new 
LiteralOp(rand.getMaxValue()));
+                                       params.put(DataExpression.RAND_PDF, new 
LiteralOp(rand.getPdf()));
+                                       params.put(DataExpression.RAND_LAMBDA, 
new LiteralOp(rand.getPdfParams()));
+                                       
params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity()));
+                                       params.put(DataExpression.RAND_SEED, 
new LiteralOp(rand.getSeed()));
+                                       Hop datagen = new 
DataGenOp(OpOpDG.RAND, new DataIdentifier("tmp"), params);
+                                       
datagen.setBlocksize(rand.getBlocksize());
+                                       operands.put(item.getId(), datagen);
+                               }
+                               break;
+                       }
+                       case Instruction: {
+                               if (item.isDedup()) {
+                                       // Create function call for each dedup 
entry 
+                                       String[] parts = 
item.getOpcode().split(LineageDedupUtils.DEDUP_DELIM); //e.g. dedup_R_SB13_0
+                                       String name = parts[2] + parts[1] + 
parts[3];  //loopId + outVar + pathId
+                                       List<Hop> finputs = 
Arrays.stream(item.getInputs())
+                                                       .map(inp -> 
operands.get(inp.getId())).collect(Collectors.toList());
+                                       String[] inputNames = new 
String[item.getInputs().length];
+                                       for (int i=0; 
i<item.getInputs().length; i++)
+                                               inputNames[i] = LPLACEHOLDER + 
i;  //e.g. IN#0, IN#1
+                                       Hop funcOp = new 
FunctionOp(FunctionType.DML, DMLProgram.DEFAULT_NAMESPACE, 
+                                                       name, inputNames, 
finputs, new String[] {parts[1]}, false);
+
+                                       // Cut the Hop dag after function calls 
+                                       partDagRoots.put(parts[1], funcOp);
+                                       // Compile the dag and save
+                                       constructBasicBlock(partDagRoots, 
parts[1], prog);
+
+                                       // Construct a Hop dag for the function 
body from the dedup patch, and compile
+                                       Hop output = 
constructHopsDedupPatch(parts, inputNames, finputs, prog);
+                                       // Create a TRead on the function o/p 
as a leaf for the next Hop dag
+                                       // Use the function body root/return 
hop to propagate right data type
+                                       operands.put(item.getId(), 
HopRewriteUtils.createTransientRead(parts[1], output));
+                                       break;
+                               }
+                               CPType ctype = 
InstructionUtils.getCPTypeByOpcode(item.getOpcode());
+                               SPType stype = 
InstructionUtils.getSPTypeByOpcode(item.getOpcode());
+                               
+                               if (ctype != null) {
+                                       switch (ctype) {
+                                               case AggregateUnary: {
+                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
+                                                       Hop aggunary = 
InstructionUtils.isUnaryMetadata(item.getOpcode()) ?
+                                                               
HopRewriteUtils.createUnary(input, OpOp1.valueOfByOpcode(item.getOpcode())) :
+                                                               
HopRewriteUtils.createAggUnaryOp(input, item.getOpcode());
+                                                       
operands.put(item.getId(), aggunary);
+                                                       break;
+                                               }
+                                               case AggregateBinary: {
+                                                       Hop input1 = 
operands.get(item.getInputs()[0].getId());
+                                                       Hop input2 = 
operands.get(item.getInputs()[1].getId());
+                                                       Hop aggbinary = 
HopRewriteUtils.createMatrixMultiply(input1, input2);
+                                                       
operands.put(item.getId(), aggbinary);
+                                                       break;
+                                               }
+                                               case AggregateTernary: {
+                                                       Hop input1 = 
operands.get(item.getInputs()[0].getId());
+                                                       Hop input2 = 
operands.get(item.getInputs()[1].getId());
+                                                       Hop input3 = 
operands.get(item.getInputs()[2].getId());
+                                                       Hop aggternary = 
HopRewriteUtils.createSum(
+                                                               
HopRewriteUtils.createBinary(
+                                                               
HopRewriteUtils.createBinary(input1, input2, OpOp2.MULT),
+                                                               input3, 
OpOp2.MULT));
+                                                       
operands.put(item.getId(), aggternary);
+                                                       break;
+                                               }
+                                               case Unary:
+                                               case Builtin: {
+                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
+                                                       Hop unary = 
HopRewriteUtils.createUnary(input, item.getOpcode());
+                                                       
operands.put(item.getId(), unary);
+                                                       break;
+                                               }
+                                               case Reorg: {
+                                                       
operands.put(item.getId(), HopRewriteUtils.createReorg(
+                                                               
operands.get(item.getInputs()[0].getId()), item.getOpcode()));
+                                                       break;
+                                               }
+                                               case Reshape: {
+                                                       ArrayList<Hop> inputs = 
new ArrayList<>();
+                                                       for(int i=0; i<5; i++)
+                                                               
inputs.add(operands.get(item.getInputs()[i].getId()));
+                                                       
operands.put(item.getId(), HopRewriteUtils.createReorg(inputs, 
ReOrgOp.RESHAPE));
+                                                       break;
+                                               }
+                                               case Binary: {
+                                                       //handle special cases 
of binary operations 
+                                                       String opcode = 
("^2".equals(item.getOpcode()) 
+                                                               || 
"*2".equals(item.getOpcode())) ? 
+                                                               
item.getOpcode().substring(0, 1) : item.getOpcode();
+                                                       Hop input1 = 
operands.get(item.getInputs()[0].getId());
+                                                       Hop input2 = 
operands.get(item.getInputs()[1].getId());
+                                                       Hop binary = 
HopRewriteUtils.createBinary(input1, input2, opcode);
+                                                       
operands.put(item.getId(), binary);
+                                                       break;
+                                               }
+                                               case Ternary: {
+                                                       
operands.put(item.getId(), HopRewriteUtils.createTernary(
+                                                               
operands.get(item.getInputs()[0].getId()), 
+                                                               
operands.get(item.getInputs()[1].getId()), 
+                                                               
operands.get(item.getInputs()[2].getId()), item.getOpcode()));
+                                                       break;
+                                               }
+                                               case Ctable: { //e.g., ctable 
+                                                       if( 
item.getInputs().length==3 )
+                                                               
operands.put(item.getId(), HopRewriteUtils.createTernary(
+                                                                       
operands.get(item.getInputs()[0].getId()),
+                                                                       
operands.get(item.getInputs()[1].getId()),
+                                                                       
operands.get(item.getInputs()[2].getId()), OpOp3.CTABLE));
+                                                       else if( 
item.getInputs().length==5 )
+                                                               
operands.put(item.getId(), HopRewriteUtils.createTernary(
+                                                                       
operands.get(item.getInputs()[0].getId()),
+                                                                       
operands.get(item.getInputs()[1].getId()),
+                                                                       
operands.get(item.getInputs()[2].getId()),
+                                                                       
operands.get(item.getInputs()[3].getId()),
+                                                                       
operands.get(item.getInputs()[4].getId()), OpOp3.CTABLE));
+                                                       break;
+                                               }
+                                               case BuiltinNary: {
+                                                       String opcode = 
item.getOpcode().equals("n+") ? "plus" : item.getOpcode();
+                                                       
operands.put(item.getId(), HopRewriteUtils.createNary(
+                                                               
OpOpN.valueOf(opcode.toUpperCase()), createNaryInputs(item, operands)));
+                                                       break;
+                                               }
+                                               case ParameterizedBuiltin: {
+                                                       
operands.put(item.getId(), constructParameterizedBuiltinOp(item, operands));
+                                                       break;
+                                               }
+                                               case MatrixIndexing: {
+                                                       
operands.put(item.getId(), constructIndexingOp(item, operands));
+                                                       break;
+                                               }
+                                               case MMTSJ: {
+                                                       //TODO handling of tsmm 
type left and right -> placement transpose
+                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
+                                                       Hop aggunary = 
HopRewriteUtils.createMatrixMultiply(
+                                                               
HopRewriteUtils.createTranspose(input), input);
+                                                       
operands.put(item.getId(), aggunary);
+                                                       break;
+                                               }
+                                               case Variable: {
+                                                       if( 
item.getOpcode().startsWith("cast") )
+                                                               
operands.put(item.getId(), HopRewriteUtils.createUnary(
+                                                                       
operands.get(item.getInputs()[0].getId()),
+                                                                       
OpOp1.valueOfByOpcode(item.getOpcode())));
+                                                       else //cpvar, write
+                                                               
operands.put(item.getId(), operands.get(item.getInputs()[0].getId()));
+                                                       break;
+                                               }
+                                               default:
+                                                       throw new 
DMLRuntimeException("Unsupported instruction "
+                                                               + "type: " + 
ctype.name() + " (" + item.getOpcode() + ").");
+                                       }
+                               }
+                               else if( stype != null ) {
+                                       switch(stype) {
+                                               case Reblock: {
+                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
+                                                       
input.setBlocksize(ConfigurationManager.getBlocksize());
+                                                       
input.setRequiresReblock(true);
+                                                       
operands.put(item.getId(), input);
+                                                       break;
+                                               }
+                                               case Checkpoint: {
+                                                       Hop input = 
operands.get(item.getInputs()[0].getId());
+                                                       
operands.put(item.getId(), input);
+                                                       break;
+                                               }
+                                               case MatrixIndexing: {
+                                                       
operands.put(item.getId(), constructIndexingOp(item, operands));
+                                                       break;
+                                               }
+                                               case GAppend: {
+                                                       
operands.put(item.getId(), HopRewriteUtils.createBinary(
+                                                               
operands.get(item.getInputs()[0].getId()),
+                                                               
operands.get(item.getInputs()[1].getId()), OpOp2.CBIND));
+                                                       break;
+                                               }
+                                               default:
+                                                       throw new 
DMLRuntimeException("Unsupported instruction "
+                                                               + "type: " + 
stype.name() + " (" + item.getOpcode() + ").");
+                                       }
+                               }
+                               else
+                                       throw new 
DMLRuntimeException("Unsupported instruction: " + item.getOpcode());
+                               break;
+                       }
+                       case Literal: {
+                               CPOperand op = new CPOperand(item.getData());
+                               operands.put(item.getId(), ScalarObjectFactory
+                                       .createLiteralOp(op.getValueType(), 
op.getName()));
+                               break;
+                       }
+                       case Dedup: {
+                               throw new NotImplementedException();
+                       }
+               }
+               
+               item.setVisited();
+       }
+
+       private static Hop constructHopsDedupPatch(String[] parts, String[] 
inputs, List<Hop> inpHops, Program prog) {
+               // Construct and compile the function body
+               String outname = parts[1];
+               Long pathId = Long.parseLong(parts[3]);
+               // Return if this patch is already compiled
+               if (patchHopMap.containsKey(pathId) && 
patchHopMap.get(pathId).containsKey(outname))
+                       return patchHopMap.get(pathId).get(outname);
+
+               // Construct a Hop dag
+               LineageItem patchRoot = patchLiMap.get(pathId).get(outname);
+               patchRoot.resetVisitStatusNR();
+               Map<Long, Hop> operands = new HashMap<>();
+               // Create TRead on the function inputs
+               //FIXME: the keys of operands can be replaced inside 
rConstructHops
+               for (int i=0; i<inputs.length; i++)
+                       operands.put((long)i, 
HopRewriteUtils.createTransientRead(inputs[i], inpHops.get(i))); //order 
preserving
+               rConstructHops(patchRoot, operands, null, null);
+               Hop out = HopRewriteUtils.createTransientWrite(outname, 
operands.get(patchRoot.getId()));
+               if (!patchHopMap.containsKey(pathId))
+                       patchHopMap.put(pathId, new HashMap<>());
+               patchHopMap.get(pathId).put(outname, out);
+               
+               // Compile to instructions and save as a FunctionProgramBlock
+               List<DataIdentifier> funcInputs = new ArrayList<>();
+               for (int i=0; i<inpHops.size(); i++)
+                       funcInputs.add(new DataIdentifier(inputs[i], 
inpHops.get(i).getDataType(), inpHops.get(i).getValueType()));
+               List<DataIdentifier> funcOutput = new 
ArrayList<>(Arrays.asList(new DataIdentifier(outname)));
+               // TODO: multi-return function
+               FunctionProgramBlock fpb = new FunctionProgramBlock(prog, 
funcInputs, funcOutput);
+               BasicProgramBlock pb = new BasicProgramBlock(prog);
+               pb.setInstructions(genInst(out));
+               fpb.addProgramBlock(pb);
+               prog.addFunctionProgramBlock(DMLProgram.DEFAULT_NAMESPACE, 
parts[2]+parts[1]+parts[3], fpb);
+               //fpb.setRecompileOnce(true);
+               return out;
+       }
+       
+       private static ArrayList<Instruction> genInst (Hop root) {
+               Dag<Lop> dag = new Dag<>();
+               Lop lops = root.constructLops();
+               lops.addToDag(dag);
+               return dag.getJobs(null, ConfigurationManager.getDMLConfig());
+       }
+
+       private static Hop[] createNaryInputs(LineageItem item, Map<Long, Hop> 
operands) {
+               int len = item.getInputs().length;
+               Hop[] ret = new Hop[len];
+               for( int i=0; i<len; i++ )
+                       ret[i] = operands.get(item.getInputs()[i].getId());
+               return ret;
+       }
+
+       private static Hop constructParameterizedBuiltinOp(LineageItem item, 
Map<Long, Hop> operands) {
+               String opcode = item.getOpcode();
+               Hop target = operands.get(item.getInputs()[0].getId());
+               LinkedHashMap<String,Hop> args = new LinkedHashMap<>();
+               if( opcode.equals("groupedagg") ) {
+                       args.put("target", target);
+                       args.put(Statement.GAGG_GROUPS, 
operands.get(item.getInputs()[1].getId()));
+                       args.put(Statement.GAGG_WEIGHTS, 
operands.get(item.getInputs()[2].getId()));
+                       args.put(Statement.GAGG_FN, 
operands.get(item.getInputs()[3].getId()));
+                       args.put(Statement.GAGG_NUM_GROUPS, 
operands.get(item.getInputs()[4].getId()));
+               }
+               else if (opcode.equalsIgnoreCase("rmempty")) {
+                       args.put("target", target);
+                       args.put("margin", 
operands.get(item.getInputs()[1].getId()));
+                       args.put("select", 
operands.get(item.getInputs()[2].getId()));
+               }
+               else if(opcode.equalsIgnoreCase("replace")) {
+                       args.put("target", target);
+                       args.put("pattern", 
operands.get(item.getInputs()[1].getId()));
+                       args.put("replacement", 
operands.get(item.getInputs()[2].getId()));
+               }
+               else if(opcode.equalsIgnoreCase("rexpand")) {
+                       args.put("target", target);
+                       args.put("max", 
operands.get(item.getInputs()[1].getId()));
+                       args.put("dir", 
operands.get(item.getInputs()[2].getId()));
+                       args.put("cast", 
operands.get(item.getInputs()[3].getId()));
+                       args.put("ignore", 
operands.get(item.getInputs()[4].getId()));
+               }
+               
+               return HopRewriteUtils.createParameterizedBuiltinOp(
+                       target, args, 
ParamBuiltinOp.valueOf(opcode.toUpperCase()));
+       }
+
+       private static Hop constructIndexingOp(LineageItem item, Map<Long, Hop> 
operands) {
+               Hop input = operands.get(item.getInputs()[0].getId());
+               if( "rightIndex".equals(item.getOpcode()) )
+                       return HopRewriteUtils.createIndexingOp(input,
+                               operands.get(item.getInputs()[1].getId()), //rl
+                               operands.get(item.getInputs()[2].getId()), //ru
+                               operands.get(item.getInputs()[3].getId()), //cl
+                               operands.get(item.getInputs()[4].getId())); //cu
+               else if( "leftIndex".equals(item.getOpcode()) 
+                               || "mapLeftIndex".equals(item.getOpcode()) )
+                       return HopRewriteUtils.createLeftIndexingOp(input,
+                               operands.get(item.getInputs()[1].getId()), //rhs
+                               operands.get(item.getInputs()[2].getId()), //rl
+                               operands.get(item.getInputs()[3].getId()), //ru
+                               operands.get(item.getInputs()[4].getId()), //cl
+                               operands.get(item.getInputs()[5].getId())); //cu
+               throw new DMLRuntimeException("Unsupported opcode: 
"+item.getOpcode());
+       }
+}
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java 
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index b0d66f2..58b9282 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -687,6 +687,10 @@ public abstract class AutomatedTestBase {
                return TestUtils.readDMLString(baseDirectory + OUTPUT_DIR + 
fileName + ".lineage");
        }
 
+       protected static String readDMLLineageDedupFromHDFS(String fileName) {
+               return TestUtils.readDMLString(baseDirectory + OUTPUT_DIR + 
fileName + ".lineage.dedup");
+       }
+
        protected static FrameBlock readDMLFrameFromHDFS(String fileName, 
FileFormat fmt) throws IOException {
                // read frame data from hdfs
                String strFrameFileName = baseDirectory + OUTPUT_DIR + fileName;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
index 6f57adc..de75ab7 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
@@ -29,9 +29,7 @@ import 
org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -103,8 +101,7 @@ public class LineageCodegenTest extends AutomatedTestBase {
                        
                        //get lineage and generate program
                        String Rtrace = readDMLLineageFromHDFS("R");
-                       LineageItem R = LineageParser.parseLineageTrace(Rtrace);
-                       Data ret = LineageItemUtils.computeByLineage(R);
+                       Data ret = 
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
                        HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
                        MatrixBlock tmp = 
((MatrixObject)ret).acquireReadAndRelease();
                        TestUtils.compareMatrices(dmlfile, tmp, 1e-6);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
index 03b7587..1dc675c 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
@@ -27,9 +27,7 @@ import org.junit.Test;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -81,8 +79,7 @@ public class LineageTraceBuiltinTest extends 
AutomatedTestBase {
                
                //get lineage and generate program
                String Rtrace = readDMLLineageFromHDFS("R");
-               LineageItem R = LineageParser.parseLineageTrace(Rtrace);
-               Data ret = LineageItemUtils.computeByLineage(R);
+               Data ret = 
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
                
                HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
                MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java
similarity index 66%
copy from 
src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
copy to 
src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java
index 03b7587..5f729d3 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java
@@ -27,62 +27,71 @@ import org.junit.Test;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 
-public class LineageTraceBuiltinTest extends AutomatedTestBase {
+public class LineageTraceDedupExecTest extends AutomatedTestBase {
        
        protected static final String TEST_DIR = "functions/lineage/";
-       protected static final String TEST_NAME1 = "LineageTraceBuiltin"; 
//rand - matrix result
-       
-       protected String TEST_CLASS_DIR = TEST_DIR + 
LineageTraceBuiltinTest.class.getSimpleName() + "/";
+       protected static final String TEST_NAME1 = "LineageTraceDedupExec1";
+       protected static final String TEST_NAME10 = "LineageTraceDedupExec10";
+       protected static final String TEST_NAME2 = "LineageTraceDedupExec2";
+       protected String TEST_CLASS_DIR = TEST_DIR + 
LineageTraceDedupExecTest.class.getSimpleName() + "/";
        
        protected static final int numRecords = 10;
        protected static final int numFeatures = 5;
        
-       public LineageTraceBuiltinTest() {
-               
-       }
-       
        @Override
        public void setUp() {
                TestUtils.clearAssertionInformation();
-               addTestConfiguration( TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"R"}) );
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1));
+               addTestConfiguration(TEST_NAME10, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME10));
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2));
        }
        
        @Test
-       public void testLineageTraceBuiltin1() {
-               testLineageTraceBuiltin(TEST_NAME1);
+       public void testLineageTraceExec1() {
+               testLineageTraceExec(TEST_NAME1);
+       }
+
+       @Test
+       public void testLineageTraceExec10() {
+               testLineageTraceExec(TEST_NAME10);
+       }
+
+       @Test
+       public void testLineageTraceExec2() {
+               testLineageTraceExec(TEST_NAME2);
        }
        
-       private void testLineageTraceBuiltin(String testname) {
+       private void testLineageTraceExec(String testname) {
                System.out.println("------------ BEGIN " + testname + 
"------------");
                
                getAndLoadTestConfiguration(testname);
                List<String> proArgs = new ArrayList<>();
                
+               proArgs.add("-lineage");
+               proArgs.add("dedup");
+               proArgs.add("-stats");
                proArgs.add("-args");
-               proArgs.add(input("X"));
                proArgs.add(output("R"));
                proArgs.add(String.valueOf(numRecords));
                proArgs.add(String.valueOf(numFeatures));
                programArgs = proArgs.toArray(new String[proArgs.size()]);
                fullDMLScriptName = getScript();
                
-               //run the test
                Lineage.resetInternalState();
+               //run the test
                runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
                
                //get lineage and generate program
                String Rtrace = readDMLLineageFromHDFS("R");
-               LineageItem R = LineageParser.parseLineageTrace(Rtrace);
-               Data ret = LineageItemUtils.computeByLineage(R);
+               String RDedupPatches = readDMLLineageDedupFromHDFS("R");
+               Data ret = 
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, RDedupPatches);
                
                HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
                MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
index d4f6f53..8e07c21 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
@@ -28,8 +28,8 @@ import 
org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
 import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixValue;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -117,12 +117,12 @@ public class LineageTraceExecSparkTest extends 
AutomatedTestBase {
                        TestUtils.compareScalars(Y_lineage, 
Explain.explain(Y_li));
                        
                        //generate program
-                       Data X_data = LineageItemUtils.computeByLineage(X_li);
+                       Data X_data = 
LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage, null);
                        HashMap<MatrixValue.CellIndex, Double> X_dmlfile = 
readDMLMatrixFromHDFS("X");
                        MatrixBlock X_tmp = 
((MatrixObject)X_data).acquireReadAndRelease();
                        TestUtils.compareMatrices(X_dmlfile, X_tmp, 1e-6);
                        
-                       Data Y_data = LineageItemUtils.computeByLineage(Y_li);
+                       Data Y_data = 
LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage, null);
                        HashMap<MatrixValue.CellIndex, Double> Y_dmlfile = 
readDMLMatrixFromHDFS("Y");
                        MatrixBlock Y_tmp = 
((MatrixObject)Y_data).acquireReadAndRelease();
                        TestUtils.compareMatrices(Y_dmlfile, Y_tmp, 1e-6);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
index b2c2b1f..c1e9205 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
@@ -28,9 +28,7 @@ import 
org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -123,8 +121,7 @@ public class LineageTraceExecTest extends AutomatedTestBase 
{
                
                //get lineage and generate program
                String Rtrace = readDMLLineageFromHDFS("R");
-               LineageItem R = LineageParser.parseLineageTrace(Rtrace);
-               Data ret = LineageItemUtils.computeByLineage(R);
+               Data ret = 
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
                
                if( testname.equals(TEST_NAME2) || testname.equals(TEST_NAME5)) 
{
                        double val1 = readDMLScalarFromHDFS("R").get(new 
CellIndex(1,1));
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
index 1aedb7b..e6ebf78 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
@@ -27,9 +27,7 @@ import org.junit.Test;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -89,8 +87,7 @@ public class LineageTraceFunctionTest extends 
AutomatedTestBase
                
                //get lineage and generate program
                String Rtrace = readDMLLineageFromHDFS("R");
-               LineageItem R = LineageParser.parseLineageTrace(Rtrace);
-               Data ret = LineageItemUtils.computeByLineage(R);
+               Data ret = 
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
                
                HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
                MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
index e796825..6b9a7bd 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
@@ -28,9 +28,7 @@ import org.apache.sysds.hops.recompile.Recompiler;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -163,8 +161,7 @@ public class LineageTraceParforTest extends 
AutomatedTestBase {
                        
                        //get lineage and generate program
                        String Rtrace = readDMLLineageFromHDFS("R");
-                       LineageItem R = LineageParser.parseLineageTrace(Rtrace);
-                       Data ret = LineageItemUtils.computeByLineage(R);
+                       Data ret = 
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
 
                        HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
                        MatrixBlock tmp = ((MatrixObject) 
ret).acquireReadAndRelease();
diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml 
b/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml
new file mode 100644
index 0000000..fc557c6
--- /dev/null
+++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=10, cols=5, seed=42);
+R = X;
+
+for(i in 1:2){
+  R = R + 1 / 2;
+  R = R * 3;
+  X = X - 5;
+  R = R - 5;
+}
+
+R = R * 4;
+write(R, $1, format="text");
diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml 
b/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml
new file mode 100644
index 0000000..1e755df
--- /dev/null
+++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=10, cols=5, seed=42);
+R = X;
+
+for (i in 1:4) {
+  R = R + 1/2;
+  if (i %% 2 == 0)
+    R = R * 3*i;
+  R = R - 5;
+}
+
+R = R * 4;
+
+write(R, $1, format="text");
diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml 
b/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml
new file mode 100644
index 0000000..3fa49dd
--- /dev/null
+++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml
@@ -0,0 +1,45 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=10, cols=5, seed=42);
+
+R = X;
+for(i in 1:5){ #10
+  if(i %% 2 == 1)
+    R = R + 1 / 2;
+  else
+    R = R * 3;
+
+  R = R - 5;
+
+  if (i %% 5 == 0)
+    R = t(R) %*% R;
+
+  R = R - 23
+}
+
+R = R * 3;
+
+#for (j in 1:2) {
+#  R = R * 4;
+#}
+
+write(R, $1, format="text");

Reply via email to