This is an automated email from the ASF dual-hosted git repository. arnabp20 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push: new b84a2d1 [SYSTEMDS-2650] Re-computation from lineage with dedup b84a2d1 is described below commit b84a2d16e1796a6d68baf9e16b7ab87982c8e34d Author: arnabp <arnab.ph...@tugraz.at> AuthorDate: Wed Aug 19 20:45:36 2020 +0200 [SYSTEMDS-2650] Re-computation from lineage with dedup This patch adds the below changes. - We now compile all the dedup patches into functions, - The main program places a function call for each dedupOp and calls the corresponding function, - Move the recomputation related code to a new class, - Add a new test class to match the recomputed results with the original outputs. Current code doesn't support multiple loops. Future commits will add optimizations to construct multi-return functions (instead of one per output variable), and compile sequence of equivalent function calls into loops. --- .../org/apache/sysds/parser/DataIdentifier.java | 7 + .../sysds/runtime/lineage/LineageDedupUtils.java | 34 ++ .../apache/sysds/runtime/lineage/LineageItem.java | 4 + .../sysds/runtime/lineage/LineageItemUtils.java | 394 ---------------- .../apache/sysds/runtime/lineage/LineageMap.java | 7 +- .../sysds/runtime/lineage/LineageParser.java | 17 +- .../runtime/lineage/LineageRecomputeUtils.java | 522 +++++++++++++++++++++ .../org/apache/sysds/test/AutomatedTestBase.java | 4 + .../test/functions/lineage/LineageCodegenTest.java | 7 +- .../functions/lineage/LineageTraceBuiltinTest.java | 7 +- ...tinTest.java => LineageTraceDedupExecTest.java} | 47 +- .../lineage/LineageTraceExecSparkTest.java | 6 +- .../functions/lineage/LineageTraceExecTest.java | 7 +- .../lineage/LineageTraceFunctionTest.java | 7 +- .../functions/lineage/LineageTraceParforTest.java | 7 +- .../functions/lineage/LineageTraceDedupExec1.dml | 33 ++ .../functions/lineage/LineageTraceDedupExec10.dml | 34 ++ .../functions/lineage/LineageTraceDedupExec2.dml | 45 ++ 18 files changed, 733 insertions(+), 456 deletions(-) diff --git a/src/main/java/org/apache/sysds/parser/DataIdentifier.java b/src/main/java/org/apache/sysds/parser/DataIdentifier.java index b58b0d9..22002d6 100644 --- a/src/main/java/org/apache/sysds/parser/DataIdentifier.java +++ b/src/main/java/org/apache/sysds/parser/DataIdentifier.java @@ -20,6 +20,7 @@ package org.apache.sysds.parser; import org.apache.sysds.common.Types.DataType; +import org.apache.sysds.common.Types.ValueType; public class DataIdentifier extends Identifier { @@ -44,6 +45,12 @@ public class DataIdentifier extends Identifier this(name); _dataType = dt; } + + public DataIdentifier(String name, DataType dt, ValueType vt){ + this(name); + _dataType = dt; + _valueType = vt; + } public DataIdentifier(){ _name = null; diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java index 6acc248..230e5e1 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java @@ -20,6 +20,7 @@ package org.apache.sysds.runtime.lineage; import java.util.ArrayList; +import java.util.Map; import org.apache.sysds.runtime.controlprogram.BasicProgramBlock; import org.apache.sysds.runtime.controlprogram.ForProgramBlock; @@ -28,8 +29,10 @@ import org.apache.sysds.runtime.controlprogram.IfProgramBlock; import org.apache.sysds.runtime.controlprogram.ProgramBlock; import org.apache.sysds.runtime.controlprogram.WhileProgramBlock; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.utils.Explain; public class LineageDedupUtils { + public static final String DEDUP_DELIM = "_"; private static Lineage _tmpLineage = null; private static Lineage _mainLineage = null; private static ArrayList<Long> _numDistinctPaths = new ArrayList<>(); @@ -131,7 +134,38 @@ public class LineageDedupUtils { _tmpLineage.clearLineageMap(); _tmpLineage.clearDedupBlock(); } + + public static String mergeExplainDedupBlocks(ExecutionContext ec) { + Map<ProgramBlock, LineageDedupBlock> dedupBlocks = ec.getLineage().getDedupBlocks(); + StringBuilder sb = new StringBuilder(); + // Gather all the DAG roots of all the paths in all the loops. + for (Map.Entry<ProgramBlock, LineageDedupBlock> dblock : dedupBlocks.entrySet()) { + if (dblock.getValue() != null) { + String forKey = dblock.getKey().getStatementBlock().getName(); + LineageDedupBlock dedup = dblock.getValue(); + for (Map.Entry<Long, LineageMap> patch : dedup.getPathMaps().entrySet()) { + for (Map.Entry<String, LineageItem> root : patch.getValue().getTraces().entrySet()) { + // Encode all the information in the headers that're + // needed by the deserialization logic. + sb.append("patch"); + sb.append(DEDUP_DELIM); + sb.append(root.getKey()); + sb.append(DEDUP_DELIM); + sb.append(forKey); + sb.append(DEDUP_DELIM); + sb.append(patch.getKey()); + sb.append("\n"); + sb.append(Explain.explain(root.getValue())); + sb.append("\n"); + + } + } + } + } + return sb.toString(); + } + //------------------------------------------------------------------------------ /* The below static functions help to compute the number of distinct paths * in any program block, and are used for diagnostic purposes. These will * be removed in future. diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java index 2a9891a..88f2fb8 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java @@ -195,6 +195,10 @@ public class LineageItem { return !_opcode.isEmpty(); } + public boolean isDedup() { + return _opcode.startsWith(dedupItemOpcode); + } + /** * Non-recursive equivalent of {@link #resetVisitStatus()} * for robustness with regard to stack overflow errors. diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java index 9a96828..f40582d 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java @@ -19,7 +19,6 @@ package org.apache.sysds.runtime.lineage; -import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -30,23 +29,10 @@ import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType; import org.apache.sysds.runtime.lineage.LineageItem.LineageItemType; import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.AggOp; -import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.Direction; -import org.apache.sysds.common.Types.OpOp1; -import org.apache.sysds.common.Types.OpOp2; -import org.apache.sysds.common.Types.OpOp3; -import org.apache.sysds.common.Types.OpOpDG; -import org.apache.sysds.common.Types.OpOpData; -import org.apache.sysds.common.Types.OpOpN; -import org.apache.sysds.common.Types.ParamBuiltinOp; -import org.apache.sysds.common.Types.ReOrgOp; -import org.apache.sysds.common.Types.ValueType; -import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.hops.AggBinaryOp; import org.apache.sysds.hops.AggUnaryOp; import org.apache.sysds.hops.BinaryOp; -import org.apache.sysds.hops.DataGenOp; -import org.apache.sysds.hops.DataOp; import org.apache.sysds.hops.Hop; import org.apache.sysds.hops.IndexingOp; import org.apache.sysds.hops.LiteralOp; @@ -54,41 +40,26 @@ import org.apache.sysds.hops.ReorgOp; import org.apache.sysds.hops.TernaryOp; import org.apache.sysds.hops.UnaryOp; import org.apache.sysds.hops.codegen.SpoofFusedOp; -import org.apache.sysds.hops.rewrite.HopRewriteUtils; -import org.apache.sysds.lops.Lop; import org.apache.sysds.lops.PartialAggregate; import org.apache.sysds.lops.UnaryCP; import org.apache.sysds.lops.compile.Dag; -import org.apache.sysds.parser.DataExpression; -import org.apache.sysds.parser.DataIdentifier; -import org.apache.sysds.parser.Statement; import org.apache.sysds.runtime.DMLRuntimeException; -import org.apache.sysds.runtime.controlprogram.BasicProgramBlock; -import org.apache.sysds.runtime.controlprogram.Program; -import org.apache.sysds.runtime.controlprogram.ProgramBlock; import org.apache.sysds.runtime.controlprogram.caching.CacheableData; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory; import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.InstructionParser; -import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction; import org.apache.sysds.runtime.instructions.cp.ScalarObject; -import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory; import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction; -import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType; -import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType; import org.apache.sysds.runtime.util.HDFSTool; -import org.apache.sysds.utils.Explain; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -97,9 +68,7 @@ import java.util.stream.Collectors; public class LineageItemUtils { - private static final String LVARPREFIX = "lvar"; public static final String LPLACEHOLDER = "IN#"; - public static final String DEDUP_DELIM = "_"; public static LineageItemType getType(String str) { if (str.length() == 1) { @@ -159,284 +128,11 @@ public class LineageItemUtils { return sb.toString().trim(); } - public static Data computeByLineage(LineageItem root) { - long rootId = root.getOpcode().equals("write") ? - root.getInputs()[0].getId() : root.getId(); - String varname = LVARPREFIX + rootId; - - //recursively construct hops - root.resetVisitStatusNR(); - Map<Long, Hop> operands = new HashMap<>(); - rConstructHops(root, operands); - Hop out = HopRewriteUtils.createTransientWrite( - varname, operands.get(rootId)); - - //generate instructions for temporary hops - ExecutionContext ec = ExecutionContextFactory.createContext(); - BasicProgramBlock pb = new BasicProgramBlock(new Program()); - Dag<Lop> dag = new Dag<>(); - Lop lops = out.constructLops(); - lops.addToDag(dag); - pb.setInstructions(dag.getJobs(null, - ConfigurationManager.getDMLConfig())); - - // reset cache due to cleaned data objects - LineageCache.resetCache(); - //execute instructions and get result - pb.execute(ec); - return ec.getVariable(varname); - } - public static LineageItem[] getLineage(ExecutionContext ec, CPOperand... operands) { return Arrays.stream(operands).filter(c -> c!=null) .map(c -> ec.getLineage().getOrCreate(c)).toArray(LineageItem[]::new); } - private static void rConstructHops(LineageItem item, Map<Long, Hop> operands) { - if (item.isVisited()) - return; - - //recursively process children (ordering by data dependencies) - if (!item.isLeaf()) - for (LineageItem c : item.getInputs()) - rConstructHops(c, operands); - - //process current lineage item - //NOTE: we generate instructions from hops (but without rewrites) to automatically - //handle execution types, rmvar instructions, and rewiring of inputs/outputs - switch (item.getType()) { - case Creation: { - Instruction inst = InstructionParser.parseSingleInstruction(item.getData()); - - if (inst instanceof DataGenCPInstruction) { - DataGenCPInstruction rand = (DataGenCPInstruction) inst; - HashMap<String, Hop> params = new HashMap<>(); - if( rand.getOpcode().equals("rand") ) { - if( rand.output.getDataType() == DataType.TENSOR) - params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims())); - else { - params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows())); - params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols())); - } - params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue())); - params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue())); - params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf())); - params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams())); - params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity())); - params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed())); - } - else if( rand.getOpcode().equals("seq") ) { - params.put(Statement.SEQ_FROM, new LiteralOp(rand.getFrom())); - params.put(Statement.SEQ_TO, new LiteralOp(rand.getTo())); - params.put(Statement.SEQ_INCR, new LiteralOp(rand.getIncr())); - } - Hop datagen = new DataGenOp(OpOpDG.valueOf(rand.getOpcode().toUpperCase()), - new DataIdentifier("tmp"), params); - datagen.setBlocksize(rand.getBlocksize()); - operands.put(item.getId(), datagen); - } else if (inst instanceof VariableCPInstruction - && ((VariableCPInstruction) inst).isCreateVariable()) { - String parts[] = InstructionUtils.getInstructionPartsWithValueType(inst.toString()); - DataType dt = DataType.valueOf(parts[4]); - ValueType vt = dt == DataType.MATRIX ? ValueType.FP64 : ValueType.STRING; - HashMap<String, Hop> params = new HashMap<>(); - params.put(DataExpression.IO_FILENAME, new LiteralOp(parts[2])); - params.put(DataExpression.READROWPARAM, new LiteralOp(Long.parseLong(parts[6]))); - params.put(DataExpression.READCOLPARAM, new LiteralOp(Long.parseLong(parts[7]))); - params.put(DataExpression.READNNZPARAM, new LiteralOp(Long.parseLong(parts[8]))); - params.put(DataExpression.FORMAT_TYPE, new LiteralOp(parts[5])); - DataOp pread = new DataOp(parts[1].substring(5), dt, vt, OpOpData.PERSISTENTREAD, params); - pread.setFileName(parts[2]); - operands.put(item.getId(), pread); - } - else if (inst instanceof RandSPInstruction) { - RandSPInstruction rand = (RandSPInstruction) inst; - HashMap<String, Hop> params = new HashMap<>(); - if (rand.output.getDataType() == DataType.TENSOR) - params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims())); - else { - params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows())); - params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols())); - } - params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue())); - params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue())); - params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf())); - params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams())); - params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity())); - params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed())); - Hop datagen = new DataGenOp(OpOpDG.RAND, new DataIdentifier("tmp"), params); - datagen.setBlocksize(rand.getBlocksize()); - operands.put(item.getId(), datagen); - } - break; - } - case Instruction: { - CPType ctype = InstructionUtils.getCPTypeByOpcode(item.getOpcode()); - SPType stype = InstructionUtils.getSPTypeByOpcode(item.getOpcode()); - - if (ctype != null) { - switch (ctype) { - case AggregateUnary: { - Hop input = operands.get(item.getInputs()[0].getId()); - Hop aggunary = InstructionUtils.isUnaryMetadata(item.getOpcode()) ? - HopRewriteUtils.createUnary(input, OpOp1.valueOfByOpcode(item.getOpcode())) : - HopRewriteUtils.createAggUnaryOp(input, item.getOpcode()); - operands.put(item.getId(), aggunary); - break; - } - case AggregateBinary: { - Hop input1 = operands.get(item.getInputs()[0].getId()); - Hop input2 = operands.get(item.getInputs()[1].getId()); - Hop aggbinary = HopRewriteUtils.createMatrixMultiply(input1, input2); - operands.put(item.getId(), aggbinary); - break; - } - case AggregateTernary: { - Hop input1 = operands.get(item.getInputs()[0].getId()); - Hop input2 = operands.get(item.getInputs()[1].getId()); - Hop input3 = operands.get(item.getInputs()[2].getId()); - Hop aggternary = HopRewriteUtils.createSum( - HopRewriteUtils.createBinary( - HopRewriteUtils.createBinary(input1, input2, OpOp2.MULT), - input3, OpOp2.MULT)); - operands.put(item.getId(), aggternary); - break; - } - case Unary: - case Builtin: { - Hop input = operands.get(item.getInputs()[0].getId()); - Hop unary = HopRewriteUtils.createUnary(input, item.getOpcode()); - operands.put(item.getId(), unary); - break; - } - case Reorg: { - operands.put(item.getId(), HopRewriteUtils.createReorg( - operands.get(item.getInputs()[0].getId()), item.getOpcode())); - break; - } - case Reshape: { - ArrayList<Hop> inputs = new ArrayList<>(); - for(int i=0; i<5; i++) - inputs.add(operands.get(item.getInputs()[i].getId())); - operands.put(item.getId(), HopRewriteUtils.createReorg(inputs, ReOrgOp.RESHAPE)); - break; - } - case Binary: { - //handle special cases of binary operations - String opcode = ("^2".equals(item.getOpcode()) - || "*2".equals(item.getOpcode())) ? - item.getOpcode().substring(0, 1) : item.getOpcode(); - Hop input1 = operands.get(item.getInputs()[0].getId()); - Hop input2 = operands.get(item.getInputs()[1].getId()); - Hop binary = HopRewriteUtils.createBinary(input1, input2, opcode); - operands.put(item.getId(), binary); - break; - } - case Ternary: { - operands.put(item.getId(), HopRewriteUtils.createTernary( - operands.get(item.getInputs()[0].getId()), - operands.get(item.getInputs()[1].getId()), - operands.get(item.getInputs()[2].getId()), item.getOpcode())); - break; - } - case Ctable: { //e.g., ctable - if( item.getInputs().length==3 ) - operands.put(item.getId(), HopRewriteUtils.createTernary( - operands.get(item.getInputs()[0].getId()), - operands.get(item.getInputs()[1].getId()), - operands.get(item.getInputs()[2].getId()), OpOp3.CTABLE)); - else if( item.getInputs().length==5 ) - operands.put(item.getId(), HopRewriteUtils.createTernary( - operands.get(item.getInputs()[0].getId()), - operands.get(item.getInputs()[1].getId()), - operands.get(item.getInputs()[2].getId()), - operands.get(item.getInputs()[3].getId()), - operands.get(item.getInputs()[4].getId()), OpOp3.CTABLE)); - break; - } - case BuiltinNary: { - String opcode = item.getOpcode().equals("n+") ? "plus" : item.getOpcode(); - operands.put(item.getId(), HopRewriteUtils.createNary( - OpOpN.valueOf(opcode.toUpperCase()), createNaryInputs(item, operands))); - break; - } - case ParameterizedBuiltin: { - operands.put(item.getId(), constructParameterizedBuiltinOp(item, operands)); - break; - } - case MatrixIndexing: { - operands.put(item.getId(), constructIndexingOp(item, operands)); - break; - } - case MMTSJ: { - //TODO handling of tsmm type left and right -> placement transpose - Hop input = operands.get(item.getInputs()[0].getId()); - Hop aggunary = HopRewriteUtils.createMatrixMultiply( - HopRewriteUtils.createTranspose(input), input); - operands.put(item.getId(), aggunary); - break; - } - case Variable: { - if( item.getOpcode().startsWith("cast") ) - operands.put(item.getId(), HopRewriteUtils.createUnary( - operands.get(item.getInputs()[0].getId()), - OpOp1.valueOfByOpcode(item.getOpcode()))); - else //cpvar, write - operands.put(item.getId(), operands.get(item.getInputs()[0].getId())); - break; - } - default: - throw new DMLRuntimeException("Unsupported instruction " - + "type: " + ctype.name() + " (" + item.getOpcode() + ")."); - } - } - else if( stype != null ) { - switch(stype) { - case Reblock: { - Hop input = operands.get(item.getInputs()[0].getId()); - input.setBlocksize(ConfigurationManager.getBlocksize()); - input.setRequiresReblock(true); - operands.put(item.getId(), input); - break; - } - case Checkpoint: { - Hop input = operands.get(item.getInputs()[0].getId()); - operands.put(item.getId(), input); - break; - } - case MatrixIndexing: { - operands.put(item.getId(), constructIndexingOp(item, operands)); - break; - } - case GAppend: { - operands.put(item.getId(), HopRewriteUtils.createBinary( - operands.get(item.getInputs()[0].getId()), - operands.get(item.getInputs()[1].getId()), OpOp2.CBIND)); - break; - } - default: - throw new DMLRuntimeException("Unsupported instruction " - + "type: " + stype.name() + " (" + item.getOpcode() + ")."); - } - } - else - throw new DMLRuntimeException("Unsupported instruction: " + item.getOpcode()); - break; - } - case Literal: { - CPOperand op = new CPOperand(item.getData()); - operands.put(item.getId(), ScalarObjectFactory - .createLiteralOp(op.getValueType(), op.getName())); - break; - } - case Dedup: { - throw new NotImplementedException(); - } - } - - item.setVisited(); - } - public static void constructLineageFromHops(Hop[] roots, String claName, Hop[] inputs, HashMap<Long, Hop> spoofmap) { //probe existence and only generate lineage if non-existing //(a fused operator might be used in multiple places of a program) @@ -537,58 +233,6 @@ public class LineageItemUtils { root.setVisited(); } - private static Hop constructIndexingOp(LineageItem item, Map<Long, Hop> operands) { - Hop input = operands.get(item.getInputs()[0].getId()); - if( "rightIndex".equals(item.getOpcode()) ) - return HopRewriteUtils.createIndexingOp(input, - operands.get(item.getInputs()[1].getId()), //rl - operands.get(item.getInputs()[2].getId()), //ru - operands.get(item.getInputs()[3].getId()), //cl - operands.get(item.getInputs()[4].getId())); //cu - else if( "leftIndex".equals(item.getOpcode()) - || "mapLeftIndex".equals(item.getOpcode()) ) - return HopRewriteUtils.createLeftIndexingOp(input, - operands.get(item.getInputs()[1].getId()), //rhs - operands.get(item.getInputs()[2].getId()), //rl - operands.get(item.getInputs()[3].getId()), //ru - operands.get(item.getInputs()[4].getId()), //cl - operands.get(item.getInputs()[5].getId())); //cu - throw new DMLRuntimeException("Unsupported opcode: "+item.getOpcode()); - } - - private static Hop constructParameterizedBuiltinOp(LineageItem item, Map<Long, Hop> operands) { - String opcode = item.getOpcode(); - Hop target = operands.get(item.getInputs()[0].getId()); - LinkedHashMap<String,Hop> args = new LinkedHashMap<>(); - if( opcode.equals("groupedagg") ) { - args.put("target", target); - args.put(Statement.GAGG_GROUPS, operands.get(item.getInputs()[1].getId())); - args.put(Statement.GAGG_WEIGHTS, operands.get(item.getInputs()[2].getId())); - args.put(Statement.GAGG_FN, operands.get(item.getInputs()[3].getId())); - args.put(Statement.GAGG_NUM_GROUPS, operands.get(item.getInputs()[4].getId())); - } - else if (opcode.equalsIgnoreCase("rmempty")) { - args.put("target", target); - args.put("margin", operands.get(item.getInputs()[1].getId())); - args.put("select", operands.get(item.getInputs()[2].getId())); - } - else if(opcode.equalsIgnoreCase("replace")) { - args.put("target", target); - args.put("pattern", operands.get(item.getInputs()[1].getId())); - args.put("replacement", operands.get(item.getInputs()[2].getId())); - } - else if(opcode.equalsIgnoreCase("rexpand")) { - args.put("target", target); - args.put("max", operands.get(item.getInputs()[1].getId())); - args.put("dir", operands.get(item.getInputs()[2].getId())); - args.put("cast", operands.get(item.getInputs()[3].getId())); - args.put("ignore", operands.get(item.getInputs()[4].getId())); - } - - return HopRewriteUtils.createParameterizedBuiltinOp( - target, args, ParamBuiltinOp.valueOf(opcode.toUpperCase())); - } - public static LineageItem rDecompress(LineageItem item) { if (item.getType() == LineageItemType.Dedup) { LineageItem dedupInput = rDecompress(item.getInputs()[0]); @@ -654,36 +298,6 @@ public class LineageItemUtils { item.setVisited(); } - public static String mergeExplainDedupBlocks(ExecutionContext ec) { - Map<ProgramBlock, LineageDedupBlock> dedupBlocks = ec.getLineage().getDedupBlocks(); - StringBuilder sb = new StringBuilder(); - // Gather all the DAG roots of all the paths in all the loops. - for (Map.Entry<ProgramBlock, LineageDedupBlock> dblock : dedupBlocks.entrySet()) { - if (dblock.getValue() != null) { - String forKey = dblock.getKey().getStatementBlock().getName(); - LineageDedupBlock dedup = dblock.getValue(); - for (Map.Entry<Long, LineageMap> patch : dedup.getPathMaps().entrySet()) { - for (Map.Entry<String, LineageItem> root : patch.getValue().getTraces().entrySet()) { - // Encode all the information in the headers that're - // needed by the deserialization logic. - sb.append("patch"); - sb.append(DEDUP_DELIM); - sb.append(root.getKey()); - sb.append(DEDUP_DELIM); - sb.append(forKey); - sb.append(DEDUP_DELIM); - sb.append(patch.getKey()); - sb.append("\n"); - sb.append(Explain.explain(root.getValue())); - sb.append("\n"); - - } - } - } - } - return sb.toString(); - } - public static LineageItem replace(LineageItem root, LineageItem liOld, LineageItem liNew) { if( liNew == null ) throw new DMLRuntimeException("Invalid null lineage item for "+liOld.getId()); @@ -794,14 +408,6 @@ public class LineageItemUtils { current.setVisited(); } - private static Hop[] createNaryInputs(LineageItem item, Map<Long, Hop> operands) { - int len = item.getInputs().length; - Hop[] ret = new Hop[len]; - for( int i=0; i<len; i++ ) - ret[i] = operands.get(item.getInputs()[i].getId()); - return ret; - } - public static boolean containsRandDataGen(HashSet<LineageItem> entries, LineageItem root) { if (entries.contains(root) || root.isVisited()) return false; diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java index fdcadff..2fc63ad 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java @@ -83,7 +83,7 @@ public class LineageMap { } public void processDedupItem(LineageMap lm, Long path, LineageItem[] liinputs, String name) { - String delim = LineageItemUtils.DEDUP_DELIM; + String delim = LineageDedupUtils.DEDUP_DELIM; for (Map.Entry<String, LineageItem> entry : lm._traces.entrySet()) { // Encode everything in the opcode needed by the deserialization logic // to map this lineage item to the right patch. @@ -250,10 +250,7 @@ public class LineageMap { if (DMLScript.LINEAGE_DEDUP) { // gracefully serialize the dedup maps without decompressing - LineageItemUtils.writeTraceToHDFS(LineageItemUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup"); - // sample code to deserialize the dedup patches - //String allDedup = LineageItemUtils.mergeExplainDedupBlocks(ec); - //LineageItem tmp = LineageParser.parseLineageTraceDedup(allDedup); + LineageItemUtils.writeTraceToHDFS(LineageDedupUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup"); } LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage"); } diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java index c70cfdd..ad2f4e8 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java @@ -112,26 +112,23 @@ public class LineageParser return new LineageItem(id, "", opcode, inputs.toArray(new LineageItem[0])); } - public static LineageItem parseLineageTraceDedup(String str) { - LineageItem li = null; - Map<Long, Map<String, LineageItem>> patchLiMap = new HashMap<>(); + protected static void parseLineageTraceDedup(String str) { str.replaceAll("\r\n", "\n"); String[] allPatches = str.split("\n\n"); for (String patch : allPatches) { String[] headBody = patch.split("\r\n|\r|\n", 2); - // Parse the header - String[] parts = headBody[0].split(LineageItemUtils.DEDUP_DELIM); - // e.g. patch_R_SB15_1 + // Parse the header (e.g. patch_R_SB15_1) + String[] parts = headBody[0].split(LineageDedupUtils.DEDUP_DELIM); // Deserialize the patch LineageItem patchLi = parseLineageTrace(headBody[1]); + //LineageItemUtils.computeByLineageDedup(patchLi); Long pathId = Long.parseLong(parts[3]); // Map the pathID and the DAG root name to the deserialized DAG. - if (!patchLiMap.containsKey(pathId)) { - patchLiMap.put(pathId, new HashMap<>()); + if (!LineageRecomputeUtils.patchLiMap.containsKey(pathId)) { + LineageRecomputeUtils.patchLiMap.put(pathId, new HashMap<>()); } - patchLiMap.get(pathId).put(parts[1], patchLi); + LineageRecomputeUtils.patchLiMap.get(pathId).put(parts[1], patchLi); // TODO: handle multiple loops } - return li; } } \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java new file mode 100644 index 0000000..8f2c226 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.lineage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.sysds.api.DMLScript; +import org.apache.sysds.common.Types.DataType; +import org.apache.sysds.common.Types.OpOp1; +import org.apache.sysds.common.Types.OpOp2; +import org.apache.sysds.common.Types.OpOp3; +import org.apache.sysds.common.Types.OpOpDG; +import org.apache.sysds.common.Types.OpOpData; +import org.apache.sysds.common.Types.OpOpN; +import org.apache.sysds.common.Types.ParamBuiltinOp; +import org.apache.sysds.common.Types.ReOrgOp; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.hops.DataGenOp; +import org.apache.sysds.hops.DataOp; +import org.apache.sysds.hops.FunctionOp; +import org.apache.sysds.hops.FunctionOp.FunctionType; +import org.apache.sysds.hops.Hop; +import org.apache.sysds.hops.LiteralOp; +import org.apache.sysds.hops.rewrite.HopRewriteUtils; +import org.apache.sysds.lops.Lop; +import org.apache.sysds.lops.compile.Dag; +import org.apache.sysds.parser.DMLProgram; +import org.apache.sysds.parser.DataExpression; +import org.apache.sysds.parser.DataIdentifier; +import org.apache.sysds.parser.Statement; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.BasicProgramBlock; +import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock; +import org.apache.sysds.runtime.controlprogram.Program; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory; +import org.apache.sysds.runtime.instructions.Instruction; +import org.apache.sysds.runtime.instructions.InstructionParser; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.cp.Data; +import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction; +import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory; +import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction; +import org.apache.sysds.runtime.instructions.spark.RandSPInstruction; +import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType; +import org.apache.sysds.utils.Explain; +import org.apache.sysds.utils.Explain.ExplainCounts; +import org.apache.sysds.utils.Statistics; + +public class LineageRecomputeUtils { + private static final String LVARPREFIX = "lvar"; + public static final String LPLACEHOLDER = "IN#"; + private static final boolean DEBUG = false; + public static final Map<Long, Map<String, LineageItem>> patchLiMap = new HashMap<>(); + private static final Map<Long, Map<String, Hop>> patchHopMap = new HashMap<>(); + + public static Data parseNComputeLineageTrace(String mainTrace, String dedupPatches) { + LineageItem root = LineageParser.parseLineageTrace(mainTrace); + if (dedupPatches != null) + LineageParser.parseLineageTraceDedup(dedupPatches); + Data ret = computeByLineage(root); + // Cleanup the statics + patchLiMap.clear(); + patchHopMap.clear(); + return ret; + } + + private static Data computeByLineage(LineageItem root) + { + long rootId = root.getOpcode().equals("write") ? + root.getInputs()[0].getId() : root.getId(); + String varname = LVARPREFIX + rootId; + Program prog = new Program(null); + + // Recursively construct hops + root.resetVisitStatusNR(); + Map<Long, Hop> operands = new HashMap<>(); + Map<String, Hop> partDagRoots = new HashMap<>(); + rConstructHops(root, operands, partDagRoots, prog); + Hop out = HopRewriteUtils.createTransientWrite( + varname, operands.get(rootId)); + + // Generate instructions + ExecutionContext ec = ExecutionContextFactory.createContext(); + partDagRoots.put(varname, out); + constructBasicBlock(partDagRoots, varname, prog); + + // Reset cache due to cleaned data objects + LineageCache.resetCache(); + //execute instructions and get result + if (DEBUG) { + DMLScript.STATISTICS = true; + ExplainCounts counts = Explain.countDistributedOperations(prog); + System.out.println(Explain.display(null, prog, Explain.ExplainType.RUNTIME, counts)); + } + ec.setProgram(prog); + prog.execute(ec); + if (DEBUG) { + Statistics.stopRunTimer(); + System.out.println(Statistics.display(DMLScript.STATISTICS_COUNT)); + } + return ec.getVariable(varname); + } + + private static void constructBasicBlock(Map<String, Hop> partDagRoots, String dedupOut, Program prog) { + Hop out = partDagRoots.get(dedupOut); + // Compile and save + BasicProgramBlock pb = new BasicProgramBlock(prog); + pb.setInstructions(genInst(out)); + prog.addProgramBlock(pb); + } + + + private static void rConstructHops(LineageItem item, Map<Long, Hop> operands, Map<String, Hop> partDagRoots, Program prog) + { + if (item.isVisited()) + return; + + //recursively process children (ordering by data dependencies) + if (!item.isLeaf()) + for (LineageItem c : item.getInputs()) + rConstructHops(c, operands, partDagRoots, prog); + + //process current lineage item + //NOTE: we generate instructions from hops (but without rewrites) to automatically + //handle execution types, rmvar instructions, and rewiring of inputs/outputs + switch (item.getType()) { + case Creation: { + if (item.getData().startsWith(LPLACEHOLDER)) { + long phId = Long.parseLong(item.getData().substring(3)); + Hop input = operands.get(phId); + operands.remove(phId); + // Replace the placeholders with TReads + operands.put(item.getId(), input); // order preserving + break; + } + Instruction inst = InstructionParser.parseSingleInstruction(item.getData()); + + if (inst instanceof DataGenCPInstruction) { + DataGenCPInstruction rand = (DataGenCPInstruction) inst; + HashMap<String, Hop> params = new HashMap<>(); + if( rand.getOpcode().equals("rand") ) { + if( rand.output.getDataType() == DataType.TENSOR) + params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims())); + else { + params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows())); + params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols())); + } + params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue())); + params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue())); + params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf())); + params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams())); + params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity())); + params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed())); + } + else if( rand.getOpcode().equals("seq") ) { + params.put(Statement.SEQ_FROM, new LiteralOp(rand.getFrom())); + params.put(Statement.SEQ_TO, new LiteralOp(rand.getTo())); + params.put(Statement.SEQ_INCR, new LiteralOp(rand.getIncr())); + } + Hop datagen = new DataGenOp(OpOpDG.valueOf(rand.getOpcode().toUpperCase()), + new DataIdentifier("tmp"), params); + datagen.setBlocksize(rand.getBlocksize()); + operands.put(item.getId(), datagen); + } else if (inst instanceof VariableCPInstruction + && ((VariableCPInstruction) inst).isCreateVariable()) { + String parts[] = InstructionUtils.getInstructionPartsWithValueType(inst.toString()); + DataType dt = DataType.valueOf(parts[4]); + ValueType vt = dt == DataType.MATRIX ? ValueType.FP64 : ValueType.STRING; + HashMap<String, Hop> params = new HashMap<>(); + params.put(DataExpression.IO_FILENAME, new LiteralOp(parts[2])); + params.put(DataExpression.READROWPARAM, new LiteralOp(Long.parseLong(parts[6]))); + params.put(DataExpression.READCOLPARAM, new LiteralOp(Long.parseLong(parts[7]))); + params.put(DataExpression.READNNZPARAM, new LiteralOp(Long.parseLong(parts[8]))); + params.put(DataExpression.FORMAT_TYPE, new LiteralOp(parts[5])); + DataOp pread = new DataOp(parts[1].substring(5), dt, vt, OpOpData.PERSISTENTREAD, params); + pread.setFileName(parts[2]); + operands.put(item.getId(), pread); + } + else if (inst instanceof RandSPInstruction) { + RandSPInstruction rand = (RandSPInstruction) inst; + HashMap<String, Hop> params = new HashMap<>(); + if (rand.output.getDataType() == DataType.TENSOR) + params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims())); + else { + params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows())); + params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols())); + } + params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue())); + params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue())); + params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf())); + params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams())); + params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity())); + params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed())); + Hop datagen = new DataGenOp(OpOpDG.RAND, new DataIdentifier("tmp"), params); + datagen.setBlocksize(rand.getBlocksize()); + operands.put(item.getId(), datagen); + } + break; + } + case Instruction: { + if (item.isDedup()) { + // Create function call for each dedup entry + String[] parts = item.getOpcode().split(LineageDedupUtils.DEDUP_DELIM); //e.g. dedup_R_SB13_0 + String name = parts[2] + parts[1] + parts[3]; //loopId + outVar + pathId + List<Hop> finputs = Arrays.stream(item.getInputs()) + .map(inp -> operands.get(inp.getId())).collect(Collectors.toList()); + String[] inputNames = new String[item.getInputs().length]; + for (int i=0; i<item.getInputs().length; i++) + inputNames[i] = LPLACEHOLDER + i; //e.g. IN#0, IN#1 + Hop funcOp = new FunctionOp(FunctionType.DML, DMLProgram.DEFAULT_NAMESPACE, + name, inputNames, finputs, new String[] {parts[1]}, false); + + // Cut the Hop dag after function calls + partDagRoots.put(parts[1], funcOp); + // Compile the dag and save + constructBasicBlock(partDagRoots, parts[1], prog); + + // Construct a Hop dag for the function body from the dedup patch, and compile + Hop output = constructHopsDedupPatch(parts, inputNames, finputs, prog); + // Create a TRead on the function o/p as a leaf for the next Hop dag + // Use the function body root/return hop to propagate right data type + operands.put(item.getId(), HopRewriteUtils.createTransientRead(parts[1], output)); + break; + } + CPType ctype = InstructionUtils.getCPTypeByOpcode(item.getOpcode()); + SPType stype = InstructionUtils.getSPTypeByOpcode(item.getOpcode()); + + if (ctype != null) { + switch (ctype) { + case AggregateUnary: { + Hop input = operands.get(item.getInputs()[0].getId()); + Hop aggunary = InstructionUtils.isUnaryMetadata(item.getOpcode()) ? + HopRewriteUtils.createUnary(input, OpOp1.valueOfByOpcode(item.getOpcode())) : + HopRewriteUtils.createAggUnaryOp(input, item.getOpcode()); + operands.put(item.getId(), aggunary); + break; + } + case AggregateBinary: { + Hop input1 = operands.get(item.getInputs()[0].getId()); + Hop input2 = operands.get(item.getInputs()[1].getId()); + Hop aggbinary = HopRewriteUtils.createMatrixMultiply(input1, input2); + operands.put(item.getId(), aggbinary); + break; + } + case AggregateTernary: { + Hop input1 = operands.get(item.getInputs()[0].getId()); + Hop input2 = operands.get(item.getInputs()[1].getId()); + Hop input3 = operands.get(item.getInputs()[2].getId()); + Hop aggternary = HopRewriteUtils.createSum( + HopRewriteUtils.createBinary( + HopRewriteUtils.createBinary(input1, input2, OpOp2.MULT), + input3, OpOp2.MULT)); + operands.put(item.getId(), aggternary); + break; + } + case Unary: + case Builtin: { + Hop input = operands.get(item.getInputs()[0].getId()); + Hop unary = HopRewriteUtils.createUnary(input, item.getOpcode()); + operands.put(item.getId(), unary); + break; + } + case Reorg: { + operands.put(item.getId(), HopRewriteUtils.createReorg( + operands.get(item.getInputs()[0].getId()), item.getOpcode())); + break; + } + case Reshape: { + ArrayList<Hop> inputs = new ArrayList<>(); + for(int i=0; i<5; i++) + inputs.add(operands.get(item.getInputs()[i].getId())); + operands.put(item.getId(), HopRewriteUtils.createReorg(inputs, ReOrgOp.RESHAPE)); + break; + } + case Binary: { + //handle special cases of binary operations + String opcode = ("^2".equals(item.getOpcode()) + || "*2".equals(item.getOpcode())) ? + item.getOpcode().substring(0, 1) : item.getOpcode(); + Hop input1 = operands.get(item.getInputs()[0].getId()); + Hop input2 = operands.get(item.getInputs()[1].getId()); + Hop binary = HopRewriteUtils.createBinary(input1, input2, opcode); + operands.put(item.getId(), binary); + break; + } + case Ternary: { + operands.put(item.getId(), HopRewriteUtils.createTernary( + operands.get(item.getInputs()[0].getId()), + operands.get(item.getInputs()[1].getId()), + operands.get(item.getInputs()[2].getId()), item.getOpcode())); + break; + } + case Ctable: { //e.g., ctable + if( item.getInputs().length==3 ) + operands.put(item.getId(), HopRewriteUtils.createTernary( + operands.get(item.getInputs()[0].getId()), + operands.get(item.getInputs()[1].getId()), + operands.get(item.getInputs()[2].getId()), OpOp3.CTABLE)); + else if( item.getInputs().length==5 ) + operands.put(item.getId(), HopRewriteUtils.createTernary( + operands.get(item.getInputs()[0].getId()), + operands.get(item.getInputs()[1].getId()), + operands.get(item.getInputs()[2].getId()), + operands.get(item.getInputs()[3].getId()), + operands.get(item.getInputs()[4].getId()), OpOp3.CTABLE)); + break; + } + case BuiltinNary: { + String opcode = item.getOpcode().equals("n+") ? "plus" : item.getOpcode(); + operands.put(item.getId(), HopRewriteUtils.createNary( + OpOpN.valueOf(opcode.toUpperCase()), createNaryInputs(item, operands))); + break; + } + case ParameterizedBuiltin: { + operands.put(item.getId(), constructParameterizedBuiltinOp(item, operands)); + break; + } + case MatrixIndexing: { + operands.put(item.getId(), constructIndexingOp(item, operands)); + break; + } + case MMTSJ: { + //TODO handling of tsmm type left and right -> placement transpose + Hop input = operands.get(item.getInputs()[0].getId()); + Hop aggunary = HopRewriteUtils.createMatrixMultiply( + HopRewriteUtils.createTranspose(input), input); + operands.put(item.getId(), aggunary); + break; + } + case Variable: { + if( item.getOpcode().startsWith("cast") ) + operands.put(item.getId(), HopRewriteUtils.createUnary( + operands.get(item.getInputs()[0].getId()), + OpOp1.valueOfByOpcode(item.getOpcode()))); + else //cpvar, write + operands.put(item.getId(), operands.get(item.getInputs()[0].getId())); + break; + } + default: + throw new DMLRuntimeException("Unsupported instruction " + + "type: " + ctype.name() + " (" + item.getOpcode() + ")."); + } + } + else if( stype != null ) { + switch(stype) { + case Reblock: { + Hop input = operands.get(item.getInputs()[0].getId()); + input.setBlocksize(ConfigurationManager.getBlocksize()); + input.setRequiresReblock(true); + operands.put(item.getId(), input); + break; + } + case Checkpoint: { + Hop input = operands.get(item.getInputs()[0].getId()); + operands.put(item.getId(), input); + break; + } + case MatrixIndexing: { + operands.put(item.getId(), constructIndexingOp(item, operands)); + break; + } + case GAppend: { + operands.put(item.getId(), HopRewriteUtils.createBinary( + operands.get(item.getInputs()[0].getId()), + operands.get(item.getInputs()[1].getId()), OpOp2.CBIND)); + break; + } + default: + throw new DMLRuntimeException("Unsupported instruction " + + "type: " + stype.name() + " (" + item.getOpcode() + ")."); + } + } + else + throw new DMLRuntimeException("Unsupported instruction: " + item.getOpcode()); + break; + } + case Literal: { + CPOperand op = new CPOperand(item.getData()); + operands.put(item.getId(), ScalarObjectFactory + .createLiteralOp(op.getValueType(), op.getName())); + break; + } + case Dedup: { + throw new NotImplementedException(); + } + } + + item.setVisited(); + } + + private static Hop constructHopsDedupPatch(String[] parts, String[] inputs, List<Hop> inpHops, Program prog) { + // Construct and compile the function body + String outname = parts[1]; + Long pathId = Long.parseLong(parts[3]); + // Return if this patch is already compiled + if (patchHopMap.containsKey(pathId) && patchHopMap.get(pathId).containsKey(outname)) + return patchHopMap.get(pathId).get(outname); + + // Construct a Hop dag + LineageItem patchRoot = patchLiMap.get(pathId).get(outname); + patchRoot.resetVisitStatusNR(); + Map<Long, Hop> operands = new HashMap<>(); + // Create TRead on the function inputs + //FIXME: the keys of operands can be replaced inside rConstructHops + for (int i=0; i<inputs.length; i++) + operands.put((long)i, HopRewriteUtils.createTransientRead(inputs[i], inpHops.get(i))); //order preserving + rConstructHops(patchRoot, operands, null, null); + Hop out = HopRewriteUtils.createTransientWrite(outname, operands.get(patchRoot.getId())); + if (!patchHopMap.containsKey(pathId)) + patchHopMap.put(pathId, new HashMap<>()); + patchHopMap.get(pathId).put(outname, out); + + // Compile to instructions and save as a FunctionProgramBlock + List<DataIdentifier> funcInputs = new ArrayList<>(); + for (int i=0; i<inpHops.size(); i++) + funcInputs.add(new DataIdentifier(inputs[i], inpHops.get(i).getDataType(), inpHops.get(i).getValueType())); + List<DataIdentifier> funcOutput = new ArrayList<>(Arrays.asList(new DataIdentifier(outname))); + // TODO: multi-return function + FunctionProgramBlock fpb = new FunctionProgramBlock(prog, funcInputs, funcOutput); + BasicProgramBlock pb = new BasicProgramBlock(prog); + pb.setInstructions(genInst(out)); + fpb.addProgramBlock(pb); + prog.addFunctionProgramBlock(DMLProgram.DEFAULT_NAMESPACE, parts[2]+parts[1]+parts[3], fpb); + //fpb.setRecompileOnce(true); + return out; + } + + private static ArrayList<Instruction> genInst (Hop root) { + Dag<Lop> dag = new Dag<>(); + Lop lops = root.constructLops(); + lops.addToDag(dag); + return dag.getJobs(null, ConfigurationManager.getDMLConfig()); + } + + private static Hop[] createNaryInputs(LineageItem item, Map<Long, Hop> operands) { + int len = item.getInputs().length; + Hop[] ret = new Hop[len]; + for( int i=0; i<len; i++ ) + ret[i] = operands.get(item.getInputs()[i].getId()); + return ret; + } + + private static Hop constructParameterizedBuiltinOp(LineageItem item, Map<Long, Hop> operands) { + String opcode = item.getOpcode(); + Hop target = operands.get(item.getInputs()[0].getId()); + LinkedHashMap<String,Hop> args = new LinkedHashMap<>(); + if( opcode.equals("groupedagg") ) { + args.put("target", target); + args.put(Statement.GAGG_GROUPS, operands.get(item.getInputs()[1].getId())); + args.put(Statement.GAGG_WEIGHTS, operands.get(item.getInputs()[2].getId())); + args.put(Statement.GAGG_FN, operands.get(item.getInputs()[3].getId())); + args.put(Statement.GAGG_NUM_GROUPS, operands.get(item.getInputs()[4].getId())); + } + else if (opcode.equalsIgnoreCase("rmempty")) { + args.put("target", target); + args.put("margin", operands.get(item.getInputs()[1].getId())); + args.put("select", operands.get(item.getInputs()[2].getId())); + } + else if(opcode.equalsIgnoreCase("replace")) { + args.put("target", target); + args.put("pattern", operands.get(item.getInputs()[1].getId())); + args.put("replacement", operands.get(item.getInputs()[2].getId())); + } + else if(opcode.equalsIgnoreCase("rexpand")) { + args.put("target", target); + args.put("max", operands.get(item.getInputs()[1].getId())); + args.put("dir", operands.get(item.getInputs()[2].getId())); + args.put("cast", operands.get(item.getInputs()[3].getId())); + args.put("ignore", operands.get(item.getInputs()[4].getId())); + } + + return HopRewriteUtils.createParameterizedBuiltinOp( + target, args, ParamBuiltinOp.valueOf(opcode.toUpperCase())); + } + + private static Hop constructIndexingOp(LineageItem item, Map<Long, Hop> operands) { + Hop input = operands.get(item.getInputs()[0].getId()); + if( "rightIndex".equals(item.getOpcode()) ) + return HopRewriteUtils.createIndexingOp(input, + operands.get(item.getInputs()[1].getId()), //rl + operands.get(item.getInputs()[2].getId()), //ru + operands.get(item.getInputs()[3].getId()), //cl + operands.get(item.getInputs()[4].getId())); //cu + else if( "leftIndex".equals(item.getOpcode()) + || "mapLeftIndex".equals(item.getOpcode()) ) + return HopRewriteUtils.createLeftIndexingOp(input, + operands.get(item.getInputs()[1].getId()), //rhs + operands.get(item.getInputs()[2].getId()), //rl + operands.get(item.getInputs()[3].getId()), //ru + operands.get(item.getInputs()[4].getId()), //cl + operands.get(item.getInputs()[5].getId())); //cu + throw new DMLRuntimeException("Unsupported opcode: "+item.getOpcode()); + } +} diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java index b0d66f2..58b9282 100644 --- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java +++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java @@ -687,6 +687,10 @@ public abstract class AutomatedTestBase { return TestUtils.readDMLString(baseDirectory + OUTPUT_DIR + fileName + ".lineage"); } + protected static String readDMLLineageDedupFromHDFS(String fileName) { + return TestUtils.readDMLString(baseDirectory + OUTPUT_DIR + fileName + ".lineage.dedup"); + } + protected static FrameBlock readDMLFrameFromHDFS(String fileName, FileFormat fmt) throws IOException { // read frame data from hdfs String strFrameFileName = baseDirectory + OUTPUT_DIR + fileName; diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java index 6f57adc..de75ab7 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java @@ -29,9 +29,7 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.lineage.Lineage; import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType; -import org.apache.sysds.runtime.lineage.LineageItem; -import org.apache.sysds.runtime.lineage.LineageItemUtils; -import org.apache.sysds.runtime.lineage.LineageParser; +import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; import org.apache.sysds.test.AutomatedTestBase; @@ -103,8 +101,7 @@ public class LineageCodegenTest extends AutomatedTestBase { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - LineageItem R = LineageParser.parseLineageTrace(Rtrace); - Data ret = LineageItemUtils.computeByLineage(R); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); TestUtils.compareMatrices(dmlfile, tmp, 1e-6); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java index 03b7587..1dc675c 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java @@ -27,9 +27,7 @@ import org.junit.Test; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.lineage.Lineage; -import org.apache.sysds.runtime.lineage.LineageItem; -import org.apache.sysds.runtime.lineage.LineageItemUtils; -import org.apache.sysds.runtime.lineage.LineageParser; +import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; import org.apache.sysds.test.AutomatedTestBase; @@ -81,8 +79,7 @@ public class LineageTraceBuiltinTest extends AutomatedTestBase { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - LineageItem R = LineageParser.parseLineageTrace(Rtrace); - Data ret = LineageItemUtils.computeByLineage(R); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java similarity index 66% copy from src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java copy to src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java index 03b7587..5f729d3 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java @@ -27,62 +27,71 @@ import org.junit.Test; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.lineage.Lineage; -import org.apache.sysds.runtime.lineage.LineageItem; -import org.apache.sysds.runtime.lineage.LineageItemUtils; -import org.apache.sysds.runtime.lineage.LineageParser; +import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; import org.apache.sysds.test.AutomatedTestBase; import org.apache.sysds.test.TestConfiguration; import org.apache.sysds.test.TestUtils; -public class LineageTraceBuiltinTest extends AutomatedTestBase { +public class LineageTraceDedupExecTest extends AutomatedTestBase { protected static final String TEST_DIR = "functions/lineage/"; - protected static final String TEST_NAME1 = "LineageTraceBuiltin"; //rand - matrix result - - protected String TEST_CLASS_DIR = TEST_DIR + LineageTraceBuiltinTest.class.getSimpleName() + "/"; + protected static final String TEST_NAME1 = "LineageTraceDedupExec1"; + protected static final String TEST_NAME10 = "LineageTraceDedupExec10"; + protected static final String TEST_NAME2 = "LineageTraceDedupExec2"; + protected String TEST_CLASS_DIR = TEST_DIR + LineageTraceDedupExecTest.class.getSimpleName() + "/"; protected static final int numRecords = 10; protected static final int numFeatures = 5; - public LineageTraceBuiltinTest() { - - } - @Override public void setUp() { TestUtils.clearAssertionInformation(); - addTestConfiguration( TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"R"}) ); + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1)); + addTestConfiguration(TEST_NAME10, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME10)); + addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2)); } @Test - public void testLineageTraceBuiltin1() { - testLineageTraceBuiltin(TEST_NAME1); + public void testLineageTraceExec1() { + testLineageTraceExec(TEST_NAME1); + } + + @Test + public void testLineageTraceExec10() { + testLineageTraceExec(TEST_NAME10); + } + + @Test + public void testLineageTraceExec2() { + testLineageTraceExec(TEST_NAME2); } - private void testLineageTraceBuiltin(String testname) { + private void testLineageTraceExec(String testname) { System.out.println("------------ BEGIN " + testname + "------------"); getAndLoadTestConfiguration(testname); List<String> proArgs = new ArrayList<>(); + proArgs.add("-lineage"); + proArgs.add("dedup"); + proArgs.add("-stats"); proArgs.add("-args"); - proArgs.add(input("X")); proArgs.add(output("R")); proArgs.add(String.valueOf(numRecords)); proArgs.add(String.valueOf(numFeatures)); programArgs = proArgs.toArray(new String[proArgs.size()]); fullDMLScriptName = getScript(); - //run the test Lineage.resetInternalState(); + //run the test runTest(true, EXCEPTION_NOT_EXPECTED, null, -1); //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - LineageItem R = LineageParser.parseLineageTrace(Rtrace); - Data ret = LineageItemUtils.computeByLineage(R); + String RDedupPatches = readDMLLineageDedupFromHDFS("R"); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, RDedupPatches); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java index d4f6f53..8e07c21 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java @@ -28,8 +28,8 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.lineage.Lineage; import org.apache.sysds.runtime.lineage.LineageItem; -import org.apache.sysds.runtime.lineage.LineageItemUtils; import org.apache.sysds.runtime.lineage.LineageParser; +import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue; import org.apache.sysds.test.AutomatedTestBase; @@ -117,12 +117,12 @@ public class LineageTraceExecSparkTest extends AutomatedTestBase { TestUtils.compareScalars(Y_lineage, Explain.explain(Y_li)); //generate program - Data X_data = LineageItemUtils.computeByLineage(X_li); + Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage, null); HashMap<MatrixValue.CellIndex, Double> X_dmlfile = readDMLMatrixFromHDFS("X"); MatrixBlock X_tmp = ((MatrixObject)X_data).acquireReadAndRelease(); TestUtils.compareMatrices(X_dmlfile, X_tmp, 1e-6); - Data Y_data = LineageItemUtils.computeByLineage(Y_li); + Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage, null); HashMap<MatrixValue.CellIndex, Double> Y_dmlfile = readDMLMatrixFromHDFS("Y"); MatrixBlock Y_tmp = ((MatrixObject)Y_data).acquireReadAndRelease(); TestUtils.compareMatrices(Y_dmlfile, Y_tmp, 1e-6); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java index b2c2b1f..c1e9205 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java @@ -28,9 +28,7 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.instructions.cp.ScalarObject; import org.apache.sysds.runtime.lineage.Lineage; -import org.apache.sysds.runtime.lineage.LineageItem; -import org.apache.sysds.runtime.lineage.LineageItemUtils; -import org.apache.sysds.runtime.lineage.LineageParser; +import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; import org.apache.sysds.test.AutomatedTestBase; @@ -123,8 +121,7 @@ public class LineageTraceExecTest extends AutomatedTestBase { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - LineageItem R = LineageParser.parseLineageTrace(Rtrace); - Data ret = LineageItemUtils.computeByLineage(R); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); if( testname.equals(TEST_NAME2) || testname.equals(TEST_NAME5)) { double val1 = readDMLScalarFromHDFS("R").get(new CellIndex(1,1)); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java index 1aedb7b..e6ebf78 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java @@ -27,9 +27,7 @@ import org.junit.Test; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.lineage.Lineage; -import org.apache.sysds.runtime.lineage.LineageItem; -import org.apache.sysds.runtime.lineage.LineageItemUtils; -import org.apache.sysds.runtime.lineage.LineageParser; +import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; import org.apache.sysds.test.AutomatedTestBase; @@ -89,8 +87,7 @@ public class LineageTraceFunctionTest extends AutomatedTestBase //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - LineageItem R = LineageParser.parseLineageTrace(Rtrace); - Data ret = LineageItemUtils.computeByLineage(R); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java index e796825..6b9a7bd 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java @@ -28,9 +28,7 @@ import org.apache.sysds.hops.recompile.Recompiler; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.lineage.Lineage; -import org.apache.sysds.runtime.lineage.LineageItem; -import org.apache.sysds.runtime.lineage.LineageItemUtils; -import org.apache.sysds.runtime.lineage.LineageParser; +import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; import org.apache.sysds.test.AutomatedTestBase; @@ -163,8 +161,7 @@ public class LineageTraceParforTest extends AutomatedTestBase { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - LineageItem R = LineageParser.parseLineageTrace(Rtrace); - Data ret = LineageItemUtils.computeByLineage(R); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); MatrixBlock tmp = ((MatrixObject) ret).acquireReadAndRelease(); diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml b/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml new file mode 100644 index 0000000..fc557c6 --- /dev/null +++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml @@ -0,0 +1,33 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = rand(rows=10, cols=5, seed=42); +R = X; + +for(i in 1:2){ + R = R + 1 / 2; + R = R * 3; + X = X - 5; + R = R - 5; +} + +R = R * 4; +write(R, $1, format="text"); diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml b/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml new file mode 100644 index 0000000..1e755df --- /dev/null +++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml @@ -0,0 +1,34 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = rand(rows=10, cols=5, seed=42); +R = X; + +for (i in 1:4) { + R = R + 1/2; + if (i %% 2 == 0) + R = R * 3*i; + R = R - 5; +} + +R = R * 4; + +write(R, $1, format="text"); diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml b/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml new file mode 100644 index 0000000..3fa49dd --- /dev/null +++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml @@ -0,0 +1,45 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = rand(rows=10, cols=5, seed=42); + +R = X; +for(i in 1:5){ #10 + if(i %% 2 == 1) + R = R + 1 / 2; + else + R = R * 3; + + R = R - 5; + + if (i %% 5 == 0) + R = t(R) %*% R; + + R = R - 23 +} + +R = R * 3; + +#for (j in 1:2) { +# R = R * 4; +#} + +write(R, $1, format="text");