This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new bd0b319 [SYSTEMDS-74] Cleanup lineage tracing (unnecessary variable
names)
bd0b319 is described below
commit bd0b319df52215b359c04590ed4091ad136ea4f9
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat May 16 17:00:59 2020 +0200
[SYSTEMDS-74] Cleanup lineage tracing (unnecessary variable names)
This patch removes unnecessary attributes from lineage items in order to
reduce the size (and GC overhead) for long lineage traces. So far, each
lineage item kept the variable name to which is was bound. As lineage
information should be independent of such properties, this information
was already ignored for lineage hashing and comparisons. In few, places
however, we use it to rewire placeholders, which is now cleaned up.
---
.../cp/AggregateUnaryCPInstruction.java | 9 +--
.../instructions/cp/ComputationCPInstruction.java | 8 ++-
.../instructions/cp/DataGenCPInstruction.java | 6 +-
.../instructions/cp/ListIndexingCPInstruction.java | 7 +-
.../instructions/cp/MatrixAppendCPInstruction.java | 7 +-
.../cp/MatrixBuiltinNaryCPInstruction.java | 7 +-
.../cp/MatrixIndexingCPInstruction.java | 7 +-
.../cp/MultiReturnBuiltinCPInstruction.java | 15 ++--
.../cp/ParameterizedBuiltinCPInstruction.java | 16 ++---
.../cp/ScalarBuiltinNaryCPInstruction.java | 8 +--
.../instructions/cp/SpoofCPInstruction.java | 9 +--
.../instructions/cp/VariableCPInstruction.java | 41 +++++------
.../fed/ComputationFEDInstruction.java | 7 +-
.../spark/BuiltinNarySPInstruction.java | 7 +-
.../spark/ComputationSPInstruction.java | 7 +-
.../spark/MatrixIndexingSPInstruction.java | 7 +-
.../instructions/spark/RandSPInstruction.java | 5 +-
.../instructions/spark/WriteSPInstruction.java | 5 +-
.../apache/sysds/runtime/lineage/LineageCache.java | 6 +-
.../apache/sysds/runtime/lineage/LineageItem.java | 81 ++++++++--------------
.../sysds/runtime/lineage/LineageItemUtils.java | 37 ++++++----
.../apache/sysds/runtime/lineage/LineageMap.java | 71 ++++++++++---------
.../sysds/runtime/lineage/LineageParser.java | 15 ++--
.../sysds/runtime/lineage/LineageRewriteReuse.java | 58 ++++++++--------
.../sysds/runtime/lineage/LineageTraceable.java | 32 ++++++++-
.../test/functions/lineage/LineageReadTest.java | 2 +-
.../test/functions/lineage/LineageRewriteTest.java | 4 +-
27 files changed, 251 insertions(+), 233 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
index 7c52737..5f053e9 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
@@ -147,12 +147,9 @@ public class AggregateUnaryCPInstruction extends
UnaryCPInstruction
throw new DMLRuntimeException("Lineage
trace "
+ "for variable
"+input1.getName()+" unavailable.");
- LineageItem li = DMLScript.LINEAGE_DEDUP ?
-
LineageItemUtils.rDecompress(ec.getLineageItem(input1)) :
- ec.getLineageItem(input1);
-
- ec.setScalarOutput(output_name, new
StringObject(
- Explain.explain(li)));
+ LineageItem li = !DMLScript.LINEAGE_DEDUP ?
ec.getLineageItem(input1):
+
LineageItemUtils.rDecompress(ec.getLineageItem(input1));
+ ec.setScalarOutput(output_name, new
StringObject(Explain.explain(li)));
break;
}
default: {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
index 3eecb80..a1c3568 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.instructions.cp;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.hops.OptimizerUtils;
@@ -30,6 +31,7 @@ import org.apache.sysds.runtime.lineage.LineageItemUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.Operator;
+
public abstract class ComputationCPInstruction extends CPInstruction
implements LineageTraceable {
public final CPOperand output;
@@ -76,8 +78,8 @@ public abstract class ComputationCPInstruction extends
CPInstruction implements
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
- getOpcode(), LineageItemUtils.getLineage(ec,
input1,input2,input3))};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(),
+ new LineageItem(getOpcode(),
LineageItemUtils.getLineage(ec, input1,input2,input3)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
index 7a37608..cdea819 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.instructions.cp;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.OpOpDG;
import org.apache.sysds.common.Types.ValueType;
@@ -38,6 +39,7 @@ import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
+
public class DataGenCPInstruction extends UnaryCPInstruction {
private OpOpDG method;
@@ -369,7 +371,7 @@ public class DataGenCPInstruction extends
UnaryCPInstruction {
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
String tmpInstStr = instString;
if (getSeed() == DataGenOp.UNSPECIFIED_SEED) {
//generate pseudo-random seed (because not specified)
@@ -381,6 +383,6 @@ public class DataGenCPInstruction extends
UnaryCPInstruction {
tmpInstStr = position != 0 ?
InstructionUtils.replaceOperand(
tmpInstStr, position,
String.valueOf(runtimeSeed)) : tmpInstStr;
}
- return new LineageItem[]{new LineageItem(output.getName(),
tmpInstStr, getOpcode())};
+ return Pair.of(output.getName(), new LineageItem(tmpInstStr,
getOpcode()));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ListIndexingCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ListIndexingCPInstruction.java
index 523eceb..bf45f8c 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ListIndexingCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ListIndexingCPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.instructions.cp;
import org.apache.sysds.lops.LeftIndex;
import org.apache.sysds.lops.RightIndex;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
@@ -96,8 +97,8 @@ public final class ListIndexingCPInstruction extends
IndexingCPInstruction {
throw new DMLRuntimeException("Invalid opcode (" +
opcode +") encountered in ListIndexingCPInstruction.");
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
getOpcode(),
- LineageItemUtils.getLineage(ec,
input1,input2,input3,rowLower,rowUpper))};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(), new LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec,
input1,input2,input3,rowLower,rowUpper)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixAppendCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixAppendCPInstruction.java
index 077e584..ea178bb 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixAppendCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixAppendCPInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.instructions.cp;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.lineage.LineageItem;
@@ -56,10 +57,10 @@ public final class MatrixAppendCPInstruction extends
AppendCPInstruction impleme
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
//TODO: break append to cbind and rbind for full compilation
chain
String opcode = _type.toString().toLowerCase();
- return new LineageItem[]{new LineageItem(output.getName(),
- opcode, LineageItemUtils.getLineage(ec, input1,
input2))};
+ return Pair.of(output.getName(),
+ new LineageItem(opcode, LineageItemUtils.getLineage(ec,
input1, input2)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixBuiltinNaryCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixBuiltinNaryCPInstruction.java
index ef8c9d7..07ab96d 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixBuiltinNaryCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixBuiltinNaryCPInstruction.java
@@ -22,6 +22,7 @@ package org.apache.sysds.runtime.instructions.cp;
import java.util.List;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.lineage.LineageItem;
@@ -69,8 +70,8 @@ public class MatrixBuiltinNaryCPInstruction extends
BuiltinNaryCPInstruction imp
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
- getOpcode(), LineageItemUtils.getLineage(ec, inputs))};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(),
+ new LineageItem(getOpcode(),
LineageItemUtils.getLineage(ec, inputs)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixIndexingCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixIndexingCPInstruction.java
index a892c56..d640313 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixIndexingCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixIndexingCPInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.instructions.cp;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.lops.LeftIndex;
import org.apache.sysds.lops.RightIndex;
@@ -121,8 +122,8 @@ public final class MatrixIndexingCPInstruction extends
IndexingCPInstruction {
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
getOpcode(),
- LineageItemUtils.getLineage(ec,
input1,input2,input3,rowLower,rowUpper,colLower,colUpper))};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(), new LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec,
input1,input2,input3,rowLower,rowUpper,colLower,colUpper)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnBuiltinCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnBuiltinCPInstruction.java
index 919ad07..2e23fee 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnBuiltinCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnBuiltinCPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.instructions.cp;
import java.util.ArrayList;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -111,11 +112,17 @@ public class MultiReturnBuiltinCPInstruction extends
ComputationCPInstruction {
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
+ public boolean hasSingleLineage() {
+ return false;
+ }
+
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public Pair[] getLineageItems(ExecutionContext ec) {
LineageItem[] inputLineage = LineageItemUtils.getLineage(ec,
input1,input2,input3);
- ArrayList<LineageItem> items = new ArrayList<>();
+ ArrayList<Pair> items = new ArrayList<>();
for (CPOperand out : _outputs)
- items.add(new LineageItem(out.getName(), getOpcode(),
inputLineage));
- return items.toArray(new LineageItem[items.size()]);
+ items.add(Pair.of(out.getName(), new
LineageItem(getOpcode(), inputLineage)));
+ return items.toArray(new Pair[items.size()]);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index 99ce686..9c92ddf 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.lops.Lop;
@@ -397,7 +398,7 @@ public class ParameterizedBuiltinCPInstruction extends
ComputationCPInstruction
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
String opcode = getOpcode();
if (opcode.equalsIgnoreCase("groupedagg")) {
CPOperand target = new
CPOperand(params.get(Statement.GAGG_TARGET), ValueType.FP64, DataType.MATRIX);
@@ -407,21 +408,20 @@ public class ParameterizedBuiltinCPInstruction extends
ComputationCPInstruction
CPOperand fn = new
CPOperand(params.get(Statement.GAGG_FN), ValueType.STRING, DataType.SCALAR,
true);
String ng =
params.containsKey(Statement.GAGG_NUM_GROUPS) ?
params.get(Statement.GAGG_NUM_GROUPS) : String.valueOf(-1);
CPOperand ngroups = new CPOperand(ng , ValueType.INT64,
DataType.SCALAR, true);
- return new LineageItem[]{new
LineageItem(output.getName(),
- getOpcode(), LineageItemUtils.getLineage(ec,
target, groups, weights, fn, ngroups))};
+ return Pair.of(output.getName(), new
LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec, target, groups,
weights, fn, ngroups)));
}
else if (opcode.equalsIgnoreCase("rmempty")) {
CPOperand target = new CPOperand(params.get("target"),
ValueType.FP64, DataType.MATRIX);
CPOperand margin = new CPOperand(params.get("margin"),
ValueType.STRING, DataType.SCALAR, true);
String sl = params.containsKey("select") ?
params.get("select") : String.valueOf(-1);
CPOperand select = new CPOperand(sl, ValueType.FP64,
DataType.MATRIX);
- return new LineageItem[]{new
LineageItem(output.getName(),
- getOpcode(), LineageItemUtils.getLineage(ec,
target, margin, select))};
+ return Pair.of(output.getName(), new
LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec, target, margin,
select)));
}
//TODO: generic interface to support all the ops
else
- return new LineageItem[]{new
LineageItem(output.getName(),
- getOpcode(),
LineageItemUtils.getLineage(ec, input1,input2,input3))};
-
+ return Pair.of(output.getName(), new
LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec,
input1,input2,input3)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ScalarBuiltinNaryCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ScalarBuiltinNaryCPInstruction.java
index 877bd1a..7fcef80 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ScalarBuiltinNaryCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ScalarBuiltinNaryCPInstruction.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -107,13 +108,10 @@ public class ScalarBuiltinNaryCPInstruction extends
BuiltinNaryCPInstruction imp
throw new DMLRuntimeException("Opcode (" + getOpcode()
+ ") not recognized in
ScalarBuiltinMultipleCPInstruction");
}
-
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
- instString, getOpcode())};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(), new LineageItem(instString,
getOpcode()));
}
-
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/SpoofCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/SpoofCPInstruction.java
index d2a9b89..e1d12f5 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/SpoofCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/SpoofCPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.instructions.cp;
import java.util.ArrayList;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.runtime.codegen.CodegenUtils;
import org.apache.sysds.runtime.codegen.SpoofOperator;
@@ -97,17 +98,13 @@ public class SpoofCPInstruction extends
ComputationCPInstruction {
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec)
- {
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
//read and deepcopy the corresponding lineage DAG (pre-codegen)
LineageItem LIroot =
LineageCodegenItem.getCodegenLTrace(getOperatorClass().getName()).deepCopy();
//replace the placeholders with original instruction inputs.
LineageItemUtils.replaceDagLeaves(ec, LIroot, _in);
- //replace the placeholder name with original output's name
- LIroot.setName(output.getName());
-
- return new LineageItem[] {LIroot};
+ return Pair.of(output.getName(), LIroot);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index 7c030c5..21b3f637 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.instructions.cp;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -1184,7 +1185,8 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
+ public Pair<String,LineageItem> getLineageItem(ExecutionContext ec) {
+ String varname = null;
LineageItem li = null;
switch (getVariableOpcode()) {
case CreateVariable:
@@ -1192,19 +1194,21 @@ public class VariableCPInstruction extends
CPInstruction implements LineageTrace
break; //otherwise fall through
case Read: {
- li = new LineageItem(getInput1().getName(),
toString(), getOpcode());
+ varname = getInput1().getName();
+ li = new
LineageItem(toString().replace(getInput1().getName(),
+
org.apache.sysds.lops.Data.PREAD_PREFIX+"xxx"), getOpcode());
break;
}
case AssignVariable: {
- li = new LineageItem(getInput2().getName(),
getOpcode(),
- new
LineageItem[]{ec.getLineage().getOrCreate(getInput1())});
+ varname = getInput2().getName();
+ li = new LineageItem(getOpcode(), new
LineageItem[]{ec.getLineage().getOrCreate(getInput1())});
break;
}
case CopyVariable: {
if (!ec.getLineage().contains(getInput1()))
throw new DMLRuntimeException("Could
not find LineageItem for " + getInput1().getName());
- li = new LineageItem(getInput2().getName(),
getOpcode(),
- new
LineageItem[]{ec.getLineage().get(getInput1())});
+ varname = getInput2().getName();
+ li = new LineageItem(getOpcode(), new
LineageItem[]{ec.getLineage().get(getInput1())});
break;
}
case Write: {
@@ -1214,21 +1218,8 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
lineages.add(ec.getLineage().getOrCreate(input));
if (_formatProperties != null &&
_formatProperties.getDescription() != null &&
!_formatProperties.getDescription().isEmpty())
lineages.add(new
LineageItem(_formatProperties.getDescription()));
- li = new LineageItem(getInput1().getName(),
- getOpcode(),
lineages.toArray(new LineageItem[0]));
- break;
- }
- case MoveVariable: {
- ArrayList<LineageItem> lineages = new
ArrayList<>();
- if (ec.getLineage().contains(getInput1()))
-
lineages.add(ec.getLineageItem(getInput1()));
- else {
-
lineages.add(ec.getLineage().getOrCreate(getInput1()));
- if (getInput3() != null)
-
lineages.add(ec.getLineage().getOrCreate(getInput3()));
- }
- li = new LineageItem(getInput2().getName(),
- getOpcode(), lineages.toArray(new
LineageItem[0]));
+ varname = getInput1().getName();
+ li = new LineageItem(getOpcode(),
lineages.toArray(new LineageItem[0]));
break;
}
case CastAsBooleanVariable:
@@ -1237,16 +1228,16 @@ public class VariableCPInstruction extends
CPInstruction implements LineageTrace
case CastAsScalarVariable:
case CastAsMatrixVariable:
case CastAsFrameVariable:{
- li = new LineageItem(getOutputVariableName(),
- getOpcode(),
LineageItemUtils.getLineage(ec, getInput1()));
+ varname = getOutputVariableName();
+ li = new LineageItem(getOpcode(),
LineageItemUtils.getLineage(ec, getInput1()));
break;
}
case RemoveVariable:
+ case MoveVariable:
default:
}
- return (li == null) ? null :
- new LineageItem[]{li};
+ return (li == null) ? null : Pair.of(varname, li);
}
public boolean isVariableCastInstruction() {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/ComputationFEDInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/ComputationFEDInstruction.java
index 8e79f9e..9d972f4 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/ComputationFEDInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/ComputationFEDInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.instructions.fed;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.hops.OptimizerUtils;
@@ -79,8 +80,8 @@ public abstract class ComputationFEDInstruction extends
FEDInstruction implement
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
- getOpcode(), LineageItemUtils.getLineage(ec,
input1, input2, input3))};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(), new LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec, input1, input2,
input3)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
index b77f422..313af16 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BuiltinNarySPInstruction.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.instructions.spark;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
@@ -202,8 +203,8 @@ public class BuiltinNarySPInstruction extends SPInstruction
implements LineageTr
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
getOpcode(),
- LineageItemUtils.getLineage(ec, inputs))};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(), new LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec, inputs)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
index d726c29..d380d91 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.instructions.spark;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
@@ -127,8 +128,8 @@ public abstract class ComputationSPInstruction extends
SPInstruction implements
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
getOpcode(),
- LineageItemUtils.getLineage(ec, input1, input2,
input3))};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(), new LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec, input1, input2,
input3)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MatrixIndexingSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index fddc69c..63e40c6 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.instructions.spark;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -628,8 +629,8 @@ public class MatrixIndexingSPInstruction extends
IndexingSPInstruction {
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
- return new LineageItem[]{new LineageItem(output.getName(),
getOpcode(),
- LineageItemUtils.getLineage(ec,
input1,input2,input3,rowLower,rowUpper,colLower,colUpper))};
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
+ return Pair.of(output.getName(), new LineageItem(getOpcode(),
+ LineageItemUtils.getLineage(ec,
input1,input2,input3,rowLower,rowUpper,colLower,colUpper)));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
index 5c7adc0..a2058b7 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.instructions.spark;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.math3.distribution.PoissonDistribution;
import org.apache.commons.math3.random.Well1024a;
import org.apache.hadoop.fs.FileSystem;
@@ -999,7 +1000,7 @@ public class RandSPInstruction extends UnarySPInstruction {
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
String tmpInstStr = instString;
if (getSeed() == DataGenOp.UNSPECIFIED_SEED) {
//generate pseudo-random seed (because not specified)
@@ -1011,6 +1012,6 @@ public class RandSPInstruction extends UnarySPInstruction
{
tmpInstStr = InstructionUtils.replaceOperand(
tmpInstStr, position,
String.valueOf(runtimeSeed));
}
- return new LineageItem[]{new LineageItem(output.getName(),
tmpInstStr, getOpcode())};
+ return Pair.of(output.getName(), new LineageItem(tmpInstStr,
getOpcode()));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
index 9257213..76cd245 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.instructions.spark;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
@@ -302,10 +303,10 @@ public class WriteSPInstruction extends SPInstruction
implements LineageTraceabl
}
@Override
- public LineageItem[] getLineageItems(ExecutionContext ec) {
+ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
LineageItem[] ret = LineageItemUtils.getLineage(ec, input1,
input2, input3, input4);
if (formatProperties != null &&
formatProperties.getDescription() != null &&
!formatProperties.getDescription().isEmpty())
ret = (LineageItem[])ArrayUtils.add(ret, new
LineageItem(formatProperties.getDescription()));
- return new LineageItem[]{new LineageItem(input1.getName(),
getOpcode(), ret)};
+ return Pair.of(input1.getName(), new LineageItem(getOpcode(),
ret));
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index bf06136..5de0ee1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -80,7 +80,7 @@ public class LineageCache
// will always fit in memory and hence can be pinned
unconditionally
if (LineageCacheConfig.isReusable(inst, ec)) {
ComputationCPInstruction cinst =
(ComputationCPInstruction) inst;
- LineageItem item = cinst.getLineageItems(ec)[0];
+ LineageItem item = cinst.getLineageItem(ec).getValue();
//atomic try reuse full/partial and set placeholder,
without
//obtaining value to avoid blocking in critical section
@@ -207,7 +207,7 @@ public class LineageCache
//TODO why do we need both of these public put methods
public static void putMatrix(Instruction inst, ExecutionContext ec,
long computetime) {
if (LineageCacheConfig.isReusable(inst, ec) ) {
- LineageItem item = ((LineageTraceable)
inst).getLineageItems(ec)[0];
+ LineageItem item = ((LineageTraceable)
inst).getLineageItem(ec).getValue();
//This method is called only to put matrix value
MatrixObject mo =
ec.getMatrixObject(((ComputationCPInstruction) inst).output);
synchronized( _cache ) {
@@ -221,7 +221,7 @@ public class LineageCache
return;
if (LineageCacheConfig.isReusable(inst, ec) ) {
//if (!isMarkedForCaching(inst, ec)) return;
- LineageItem item = ((LineageTraceable)
inst).getLineageItems(ec)[0];
+ LineageItem item = ((LineageTraceable)
inst).getLineageItem(ec).getValue();
Data data = ec.getVariable(((ComputationCPInstruction)
inst).output);
synchronized( _cache ) {
if (data instanceof MatrixObject)
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
index 7490dfa..e5345e8 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
@@ -28,7 +28,6 @@ public class LineageItem {
private final long _id;
private final String _opcode;
- private String _name;
private final String _data;
private final LineageItem[] _inputs;
private int _hash = 0;
@@ -39,58 +38,49 @@ public class LineageItem {
public enum LineageItemType {Literal, Creation, Instruction, Dedup}
public static final String dedupItemOpcode = "dedup";
- public LineageItem(long id, String name, String data) { this(id, name,
data, "", null); }
-
- public LineageItem(long id, String name, String opcode, LineageItem[]
inputs) { this(id, name, "", opcode ,inputs); }
-
- public LineageItem(String name) { this(_idSeq.getNextID(), name, name,
"", null); }
+ public LineageItem() {
+ this("");
+ }
- public LineageItem(String name, String data) { this(_idSeq.getNextID(),
name, data, "", null); }
+ public LineageItem(String data) {
+ this(_idSeq.getNextID(), data);
+ }
+
+ public LineageItem(long id, String data) {
+ this(id, data, "", null);
+ }
- public LineageItem(String name, String data, String opcode) {
this(_idSeq.getNextID(), name, data, opcode, null); }
+ public LineageItem(String data, String opcode) {
+ this(_idSeq.getNextID(), data, opcode, null);
+ }
- public LineageItem(String name, String opcode, LineageItem[] inputs) {
this(_idSeq.getNextID(), name, "", opcode, inputs); }
+ public LineageItem(String opcode, LineageItem[] inputs) {
+ this(_idSeq.getNextID(), "", opcode, inputs);
+ }
- public LineageItem(String name, String data, String opcode,
LineageItem[] inputs) { this(_idSeq.getNextID(), name, data, opcode, inputs); }
+ public LineageItem(String data, String opcode, LineageItem[] inputs) {
+ this(_idSeq.getNextID(), data, opcode, inputs);
+ }
- public LineageItem(long id, String name, String data, String opcode,
LineageItem[] inputs) {
- _id = id;
- _opcode = opcode;
- _name = name;
- _data = data;
- _inputs = inputs;
+ public LineageItem(LineageItem li) {
+ this(_idSeq.getNextID(), li);
}
public LineageItem(long id, LineageItem li) {
- _id = id;
- _opcode = li._opcode;
- _name = li._name;
- _data = li._data;
- _inputs = li._inputs;
+ this(id, li._data, li._opcode, li._inputs);
}
- public LineageItem(LineageItem other) {
- _id = _idSeq.getNextID();
- _opcode = other._opcode;
- _name = other._name;
- _data = other._data;
- _visited = other._visited;
- _hash = other._hash;
- _inputs = other._inputs;
+ public LineageItem(long id, String data, String opcode, LineageItem[]
inputs) {
+ _id = id;
+ _opcode = opcode;
+ _data = data;
+ _inputs = inputs;
}
public LineageItem[] getInputs() {
return _inputs;
}
- public String getName() {
- return _name;
- }
-
- public void setName(String name) {
- _name = name;
- }
-
public String getData() {
return _data;
}
@@ -149,14 +139,7 @@ public class LineageItem {
return true;
boolean ret = _opcode.equals(that._opcode);
-
- //If this is LineageItemType.Creation, remove _name in _data
- if (getType() == LineageItemType.Creation) {
- String this_data = _data.replace(_name, "");
- String that_data = that._data.replace(that._name, "");
- ret &= this_data.equals(that_data);
- } else
- ret &= _data.equals(that._data);
+ ret &= _data.equals(that._data);
if (_inputs != null && ret && (_inputs.length ==
that._inputs.length))
for (int i = 0; i < _inputs.length; i++)
@@ -174,11 +157,7 @@ public class LineageItem {
if (_inputs != null)
for (LineageItem li : _inputs)
h = UtilFunctions.intHashCode(h,
li.hashCode());
-
- //if Creation type, remove _name in _data
- _hash = UtilFunctions.intHashCode(h,
- ((getType() == LineageItemType.Creation) ?
- _data.replace(_name, "") : _data).hashCode());
+ _hash = UtilFunctions.intHashCode(h, _data.hashCode());
}
return _hash;
}
@@ -190,7 +169,7 @@ public class LineageItem {
LineageItem[] copyInputs = new LineageItem[getInputs().length];
for (int i=0; i<_inputs.length; i++)
copyInputs[i] = _inputs[i].deepCopy();
- return new LineageItem(_name, _opcode, copyInputs);
+ return new LineageItem(_opcode, copyInputs);
}
public boolean isLeaf() {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
index aeeacd3..1c787a2 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
@@ -85,6 +85,7 @@ import java.util.stream.Collectors;
public class LineageItemUtils {
private static final String LVARPREFIX = "lvar";
+ private static final String LPLACEHOLDER = "IN#";
public static LineageItemType getType(String str) {
if (str.length() == 1) {
@@ -404,7 +405,8 @@ public class LineageItemUtils {
if (ArrayUtils.contains(inputs, root) || spoof) {
Hop tmp = spoof ? spoofmap.get(root.getHopID()) : root;
int pos = ArrayUtils.indexOf(inputs, tmp);
- LineageItem li = new LineageItem(String.valueOf(pos),
"InputPlaceholder", "Create"+String.valueOf(root.getHopID()));
+ LineageItem li = new LineageItem(LPLACEHOLDER+pos,
+ "Create"+String.valueOf(root.getHopID()));
operands.put(tmp.getHopID(), li);
return;
}
@@ -453,7 +455,7 @@ public class LineageItemUtils {
sb.append(root.getValueType().toString());
sb.append(Instruction.VALUETYPE_PREFIX);
sb.append(true); //isLiteral = true
- li = new LineageItem(root.getName(), sb.toString());
+ li = new LineageItem(sb.toString());
}
else
throw new DMLRuntimeException("Unsupported hop:
"+root.getOpString());
@@ -492,22 +494,22 @@ public class LineageItemUtils {
for (LineageItem li : item.getInputs()[1].getInputs())
inputs.add(rDecompress(li));
- LineageItem li = new
LineageItem(item.getInputs()[1].getName(),
- item.getInputs()[1].getData(),
+ LineageItem li = new
LineageItem(item.getInputs()[1].getData(),
item.getInputs()[1].getOpcode(),
inputs.toArray(new LineageItem[0]));
li.resetVisitStatus();
- rSetDedupInputOntoOutput(item.getName(), li,
dedupInput);
+ rSetDedupInputOntoOutput(item.getData(), li,
dedupInput);
li.resetVisitStatus();
return li;
- } else {
+ }
+ else {
ArrayList<LineageItem> inputs = new ArrayList<>();
if (item.getInputs() != null) {
for (LineageItem li : item.getInputs())
inputs.add(rDecompress(li));
}
- return new LineageItem(item.getName(), item.getData(),
- item.getOpcode(), inputs.toArray(new
LineageItem[0]));
+ return new LineageItem(
+ item.getData(), item.getOpcode(),
inputs.toArray(new LineageItem[0]));
}
}
@@ -532,9 +534,13 @@ public class LineageItemUtils {
if (item.getInputs() != null)
for (int i = 0; i < item.getInputs().length; i++) {
LineageItem li = item.getInputs()[i];
-
- if (li.getName().equals(name))
- item.getInputs()[i] = dedupInput;
+ //replace CPOperand literals (placeholders)
+ //TODO should use the same placeholder meta
data as codegen
+ if( li.getType() == LineageItemType.Literal ) {
+ CPOperand tmp = new
CPOperand(li.getData());
+ if( !tmp.isLiteral() &&
tmp.getName().equals(name) )
+ item.getInputs()[i] =
dedupInput;
+ }
rSetDedupInputOntoOutput(name, li, dedupInput);
}
@@ -553,7 +559,7 @@ public class LineageItemUtils {
if( current.isVisited() || current.getInputs() == null )
return;
if( liNew == null )
- throw new DMLRuntimeException("Invalid null lineage
item for "+liOld.getName());
+ throw new DMLRuntimeException("Invalid null lineage
item for "+liOld.getId());
//process children until old item found, then replace
for(int i=0; i<current.getInputs().length; i++) {
LineageItem tmp = current.getInputs()[i];
@@ -578,9 +584,10 @@ public class LineageItemUtils {
for (int i=0; i<root.getInputs().length; i++) {
LineageItem li = root.getInputs()[i];
- if (li.isLeaf() && li.getType() !=
LineageItemType.Literal)
- //order-preserving replacement. Name represents
relative position.
- root.getInputs()[i] =
newleaves[Integer.parseInt(li.getName())];
+ if (li.isLeaf() && li.getType() !=
LineageItemType.Literal
+ && li.getData().startsWith(LPLACEHOLDER))
+ //order-preserving replacement. IN#<xxx>
represents relative position xxx
+ root.getInputs()[i] =
newleaves[Integer.parseInt(li.getData().substring(3))];
else
rReplaceDagLeaves(li, newleaves);
}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
index b766cb0..4216a93 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
@@ -19,6 +19,7 @@
package org.apache.sysds.runtime.lineage;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -57,21 +58,26 @@ public class LineageMap {
if (!(inst instanceof LineageTraceable))
throw new DMLRuntimeException("Unknown Instruction (" +
inst.getOpcode() + ") traced.");
- LineageItem[] items = ((LineageTraceable)
inst).getLineageItems(ec);
- if (items == null || items.length < 1)
- trace(inst, ec, null);
+ if( ((LineageTraceable) inst).hasSingleLineage() ) {
+ trace(inst, ec, ((LineageTraceable)
inst).getLineageItem(ec));
+ }
else {
- for (LineageItem li : items)
- trace(inst, ec, cleanupInputLiterals(li, ec));
+ Pair<String, LineageItem>[] items = ((LineageTraceable)
inst).getLineageItems(ec);
+ if (items == null || items.length < 1)
+ trace(inst, ec, null);
+ else {
+ for (Pair<String, LineageItem> li : items)
+ trace(inst, ec,
cleanupInputLiterals(li, ec));
+ }
}
}
public void processDedupItem(LineageMap lm, Long path) {
for (Map.Entry<String, LineageItem> entry :
lm._traces.entrySet()) {
if (_traces.containsKey(entry.getKey())) {
- addLineageItem(new LineageItem(entry.getKey(),
- path.toString(),
LineageItem.dedupItemOpcode,
- new
LineageItem[]{_traces.get(entry.getKey()), entry.getValue()}));
+ addLineageItem(Pair.of(entry.getKey(),
+ new LineageItem(entry.getKey(),
LineageItem.dedupItemOpcode,
+ new
LineageItem[]{_traces.get(entry.getKey()), entry.getValue()})));
}
}
}
@@ -84,14 +90,13 @@ public class LineageMap {
if (variable.isLiteral()) {
LineageItem ret = _literals.get(varname);
if (ret == null)
- _literals.put(varname, ret = new LineageItem(
- varname, variable.getLineageLiteral()));
+ _literals.put(varname, ret = new
LineageItem(variable.getLineageLiteral()));
return ret;
}
//handle variables
LineageItem ret = _traces.get(variable.getName());
return (ret != null) ? ret :
- new LineageItem(varname, variable.getLineageLiteral());
+ new LineageItem(variable.getLineageLiteral());
}
public LineageItem get(String varName) {
@@ -133,7 +138,7 @@ public class LineageMap {
return _literals;
}
- private void trace(Instruction inst, ExecutionContext ec, LineageItem
li) {
+ private void trace(Instruction inst, ExecutionContext ec, Pair<String,
LineageItem> li) {
if (inst instanceof VariableCPInstruction) {
VariableCPInstruction vcp_inst =
((VariableCPInstruction) inst);
@@ -159,7 +164,7 @@ public class LineageMap {
break;
}
case MoveVariable: {
- processMoveLI(li);
+
moveLineageItem(vcp_inst.getInput1().getName(), vcp_inst.getInput2().getName());
break;
}
case CastAsBooleanVariable:
@@ -183,12 +188,13 @@ public class LineageMap {
}
- private LineageItem cleanupInputLiterals(LineageItem li,
ExecutionContext ec) {
- if( li.getInputs() == null )
+ private Pair<String, LineageItem> cleanupInputLiterals(Pair<String,
LineageItem> li, ExecutionContext ec) {
+ LineageItem item = li.getValue();
+ if( item.getInputs() == null )
return li;
// fix literals referring to variables (e.g., for/parfor loop
variable)
- for(int i=0; i<li.getInputs().length; i++) {
- LineageItem tmp = li.getInputs()[i];
+ for(int i=0; i<item.getInputs().length; i++) {
+ LineageItem tmp = item.getInputs()[i];
if( tmp.getType() != LineageItemType.Literal)
continue;
//check if CPOperand is not a literal, w/o parsing
@@ -196,28 +202,34 @@ public class LineageMap {
CPOperand cp = new CPOperand(tmp.getData());
if( cp.getDataType().isScalar() ) {
cp.setLiteral(ec.getScalarInput(cp));
- li.getInputs()[i] = getOrCreate(cp);
+ item.getInputs()[i] = getOrCreate(cp);
}
}
}
return li;
}
- private void processCopyLI(LineageItem li) {
- if (li.getInputs().length != 1)
+ private void processCopyLI(Pair<String, LineageItem> li) {
+ if (li.getValue().getInputs().length != 1)
throw new DMLRuntimeException("AssignVariable and
CopyVariable must have one input lineage item!");
//add item or overwrite existing item
- _traces.put(li.getName(), li.getInputs()[0]);
+ _traces.put(li.getKey(), li.getValue().getInputs()[0]);
+ }
+
+ private void moveLineageItem(String keyFrom, String keyTo) {
+ LineageItem input = removeLineageItem(keyFrom);
+ if (!keyTo.equals("__pred"))
+ _traces.put(keyTo, input);
}
- private void removeLineageItem(String key) {
+ private LineageItem removeLineageItem(String key) {
//remove item if present
- _traces.remove(key);
+ return _traces.remove(key);
}
- private void addLineageItem(LineageItem li) {
+ private void addLineageItem(Pair<String, LineageItem> li) {
//add item or overwrite existing item
- _traces.put(li.getName(), li);
+ _traces.put(li.getKey(), li.getValue());
}
private void processWriteLI(CPOperand input1, CPOperand input2,
ExecutionContext ec) {
@@ -230,13 +242,4 @@ public class LineageMap {
}
LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName +
".lineage");
}
-
- private void processMoveLI(LineageItem li) {
- if (li.getName().equals("__pred"))
- removeLineageItem(li.getInputs()[0].getName());
- else {
- //remove from old and move to new key
- _traces.put(li.getName(), li.getInputs()[0]);
- }
- }
}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
index d6b9ab8..92cfa2c 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
@@ -20,12 +20,12 @@
package org.apache.sysds.runtime.lineage;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.parser.ParseException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.InstructionParser;
-import org.apache.sysds.runtime.instructions.cp.CPOperand;
import java.util.ArrayList;
import java.util.HashMap;
@@ -65,18 +65,15 @@ public class LineageParser
Instruction inst =
InstructionParser.parseSingleInstruction(representation);
if (!(inst instanceof LineageTraceable))
throw new
ParseException("Invalid Instruction (" + inst.getOpcode() + ") traced");
-
- LineageItem[] items =
((LineageTraceable) inst).getLineageItems(ec);
- if (items == null)
+ Pair<String,LineageItem> item =
((LineageTraceable) inst).getLineageItem(ec);
+ if (item == null)
throw new
ParseException("Instruction without output (" + inst.getOpcode() + ") not
supported");
- if (items.length != 1)
- throw new
ParseException("Instruction with multiple outputs (" + inst.getOpcode() + ")
not supported");
- li = new LineageItem(id, items[0]);
+ li = new LineageItem(id,
item.getValue());
break;
case Literal:
- li = new LineageItem(id, new
CPOperand(representation).getName(), representation);
+ li = new LineageItem(id,
representation);
break;
case Instruction:
@@ -105,6 +102,6 @@ public class LineageParser
} else
throw new ParseException("Invalid format for
LineageItem reference");
}
- return new LineageItem(id, name, opcode, inputs.toArray(new
LineageItem[0]));
+ return new LineageItem(id, "", opcode, inputs.toArray(new
LineageItem[0]));
}
}
\ No newline at end of file
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
index dc1d2da..8544388 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRewriteReuse.java
@@ -643,9 +643,9 @@ public class LineageRewriteReuse
}
// If the input to tsmm came from cbind, look for both the
inputs in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("tsmm")) {
- LineageItem source = items[0].getInputs()[0];
+ LineageItem source = item.getInputs()[0];
if (source.getOpcode().equalsIgnoreCase("cbind")) {
//for (LineageItem input : source.getInputs()) {
// create tsmm lineage on top of the input of
last append
@@ -669,9 +669,9 @@ public class LineageRewriteReuse
}
// If the input to tsmm came from cbind, look for both the
inputs in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("tsmm")) {
- LineageItem source = items[0].getInputs()[0];
+ LineageItem source = item.getInputs()[0];
if (source.getOpcode().equalsIgnoreCase("cbind")) {
// check if the appended column is a matrix of
1s
LineageItem input2 = source.getInputs()[1];
@@ -697,9 +697,9 @@ public class LineageRewriteReuse
return false;
// If the input to tsmm came from rbind, look for both the
inputs in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("tsmm")) {
- LineageItem source = items[0].getInputs()[0];
+ LineageItem source = item.getInputs()[0];
if (source.getOpcode().equalsIgnoreCase("rbind")) {
// create tsmm lineage on top of the input of
last append
LineageItem input1 = source.getInputs()[0];
@@ -722,10 +722,10 @@ public class LineageRewriteReuse
//TODO: support nary cbind
// If the input to tsmm came from cbind, look for both the
inputs in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
// look for two consecutive cbinds
if (curr.getOpcode().equalsIgnoreCase("tsmm")) {
- LineageItem source = items[0].getInputs()[0];
+ LineageItem source = item.getInputs()[0];
if (source.getOpcode().equalsIgnoreCase("cbind")) {
LineageItem input = source.getInputs()[0];
if
(input.getOpcode().equalsIgnoreCase("cbind")) {
@@ -750,10 +750,10 @@ public class LineageRewriteReuse
return false;
// If the left input to ba+* came from rbind, look for both the
inputs in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("ba+*")) {
- LineageItem left= items[0].getInputs()[0];
- LineageItem right = items[0].getInputs()[1];
+ LineageItem left= item.getInputs()[0];
+ LineageItem right = item.getInputs()[1];
if (left.getOpcode().equalsIgnoreCase("rbind")){
LineageItem leftSource = left.getInputs()[0];
//left inpur of rbind = X
// create ba+* lineage on top of the input of
last append
@@ -775,10 +775,10 @@ public class LineageRewriteReuse
return false;
// If the right input to ba+* came from cbind, look for both
the inputs in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("ba+*")) {
- LineageItem left = items[0].getInputs()[0];
- LineageItem right = items[0].getInputs()[1];
+ LineageItem left = item.getInputs()[0];
+ LineageItem right = item.getInputs()[1];
if (right.getOpcode().equalsIgnoreCase("cbind")) {
LineageItem rightSource = right.getInputs()[0];
//left inpur of rbind = X
// create ba+* lineage on top of the input of
last append
@@ -799,10 +799,10 @@ public class LineageRewriteReuse
return false;
// If the right input to ba+* came from cbind of a matrix and
ones.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("ba+*")) {
- LineageItem left = items[0].getInputs()[0];
- LineageItem right = items[0].getInputs()[1];
+ LineageItem left = item.getInputs()[0];
+ LineageItem right = item.getInputs()[1];
if (right.getOpcode().equalsIgnoreCase("cbind")) {
LineageItem rightSource1 =
right.getInputs()[0]; //left input of cbind is X
LineageItem rightSource2 =
right.getInputs()[1];
@@ -827,10 +827,10 @@ public class LineageRewriteReuse
return false;
// If the inputs to * came from rbind, look for both the inputs
in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("*")) {
- LineageItem left= items[0].getInputs()[0];
- LineageItem right = items[0].getInputs()[1];
+ LineageItem left= item.getInputs()[0];
+ LineageItem right = item.getInputs()[1];
if (left.getOpcode().equalsIgnoreCase("rbind") &&
right.getOpcode().equalsIgnoreCase("rbind")){
LineageItem leftSource = left.getInputs()[0];
//left inpur of rbind = X
LineageItem rightSource = right.getInputs()[0];
//right inpur of rbind = Y
@@ -854,10 +854,10 @@ public class LineageRewriteReuse
return false;
// If the inputs to * came from cbind, look for both the inputs
in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("*")) {
- LineageItem left= items[0].getInputs()[0];
- LineageItem right = items[0].getInputs()[1];
+ LineageItem left= item.getInputs()[0];
+ LineageItem right = item.getInputs()[1];
if (left.getOpcode().equalsIgnoreCase("cbind") &&
right.getOpcode().equalsIgnoreCase("cbind")){
LineageItem leftSource = left.getInputs()[0];
//left inpur of cbind = X
LineageItem rightSource = right.getInputs()[0];
//right inpur of cbind = Y
@@ -882,13 +882,13 @@ public class LineageRewriteReuse
}
// If the input to groupedagg came from cbind, look for both
the inputs in cache.
- LineageItem[] items = ((ComputationCPInstruction)
curr).getLineageItems(ec);
+ LineageItem item = ((ComputationCPInstruction)
curr).getLineageItem(ec).getValue();
if (curr.getOpcode().equalsIgnoreCase("groupedagg")) {
- LineageItem target = items[0].getInputs()[0];
- LineageItem groups = items[0].getInputs()[1];
- LineageItem weights = items[0].getInputs()[2];
- LineageItem fn = items[0].getInputs()[3];
- LineageItem ngroups = items[0].getInputs()[4];
+ LineageItem target = item.getInputs()[0];
+ LineageItem groups = item.getInputs()[1];
+ LineageItem weights = item.getInputs()[2];
+ LineageItem fn = item.getInputs()[3];
+ LineageItem ngroups = item.getInputs()[4];
if (target.getOpcode().equalsIgnoreCase("cbind")) {
// create groupedagg lineage on top of the
input of last append
LineageItem input1 = target.getInputs()[0];
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageTraceable.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageTraceable.java
index f7e44f4..aca3e8f 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageTraceable.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageTraceable.java
@@ -19,8 +19,36 @@
package org.apache.sysds.runtime.lineage;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-public interface LineageTraceable {
- public LineageItem[] getLineageItems(ExecutionContext ec);
+public interface LineageTraceable
+{
+ /**
+ * Obtain meta data on number of outputs and thus, number of lineage
items.
+ *
+ * @return true iff instruction has a single output
+ */
+ public default boolean hasSingleLineage() {
+ return true;
+ }
+
+ /**
+ * Obtain lineage trace of an instruction with a single output.
+ *
+ * @param ec execution context w/ live variables
+ * @return pair of (output variable name, output lineage item)
+ */
+ public Pair<String,LineageItem> getLineageItem(ExecutionContext ec);
+
+ /**
+ * Obtain lineage trace of an instruction with multiple outputs.
+ *
+ * @param ec execution context w/ live variables
+ * @return pairs of (output variable name, output lineage item)
+ */
+ public default Pair<String,LineageItem>[]
getLineageItems(ExecutionContext ec) {
+ throw new DMLRuntimeException("Unsupported call for instruction
with single output.");
+ }
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageReadTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageReadTest.java
index cc57d13..c0bf5d6 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageReadTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageReadTest.java
@@ -50,7 +50,7 @@ public class LineageReadTest extends AutomatedTestBase {
getAndLoadTestConfiguration(TEST_NAME);
String lineage =
- "(0) (C)
CP°createvar°pREADX°target/testTemp/functions/lineage/LineageTraceTest/in/X°false°MATRIX°text°10°5°-1°-1°copy\n"
+
+ "(0) (C)
CP°createvar°pREADxxx°target/testTemp/functions/lineage/LineageTraceTest/in/X°false°MATRIX°text°10°5°-1°-1°copy\n"
+
"(2) (I) rblk (0)\n" +
"(4) (L) 3·SCALAR·INT64·true\n" +
"(5) (I) * (2) (4)\n" +
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageRewriteTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageRewriteTest.java
index 9942099..8b34e0e 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageRewriteTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageRewriteTest.java
@@ -124,13 +124,13 @@ public class LineageRewriteTest extends AutomatedTestBase
{
fullDMLScriptName = getScript();
double[][] X = getRandomMatrix(numRecords, numFeatures,
0, 1, 0.8, -1);
double[][] Y = !elementwise ?
getRandomMatrix(numFeatures, numRecords, 0, 1, 0.8, -1)
- :
getRandomMatrix(numRecords, numFeatures, 0, 1, 0.8, -1);
+ : getRandomMatrix(numRecords, numFeatures, 0,
1, 0.8, -1);
if (classes > 0) {
Y = getRandomMatrix(numRecords, 1, 0, 1, 1,
-1);
for(int i=0; i<numRecords; i++){
Y[i][0] = (int)(Y[i][0]*classes) + 1;
Y[i][0] = (Y[i][0] > classes) ?
classes : Y[i][0];
- }
+ }
}
writeInputMatrixWithMTD("X", X, true);
writeInputMatrixWithMTD("Y", Y, true);