This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 68e5e29 [SYSTEMDS-2583] Add test for a mini-batch scenario with dedup
68e5e29 is described below
commit 68e5e298c73d1013c528d76c789dba77fd106eb8
Author: arnabp <[email protected]>
AuthorDate: Sun Feb 28 14:48:53 2021 +0100
[SYSTEMDS-2583] Add test for a mini-batch scenario with dedup
This patch fixes a bug in placeholder handling in lineage deuplication.
In addition, it adds a test to reuse preprocessing in a mini-batch
like scenario with deduplication.
---
.../apache/sysds/parser/WhileStatementBlock.java | 6 ++-
.../apache/sysds/runtime/lineage/LineageCache.java | 22 +++++++++
.../runtime/lineage/LineageCacheEviction.java | 2 +-
.../sysds/runtime/lineage/LineageDedupUtils.java | 14 +++++-
.../test/functions/lineage/DedupReuseTest.java | 9 +++-
src/test/scripts/functions/lineage/DedupReuse6.dml | 56 ++++++++++++++++++++++
6 files changed, 103 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/apache/sysds/parser/WhileStatementBlock.java
b/src/main/java/org/apache/sysds/parser/WhileStatementBlock.java
index 89f9261..7a09242 100644
--- a/src/main/java/org/apache/sysds/parser/WhileStatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/WhileStatementBlock.java
@@ -21,6 +21,7 @@ package org.apache.sysds.parser;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.Hop;
@@ -259,11 +260,12 @@ public class WhileStatementBlock extends StatementBlock
// By calling getInputstoSB on all the child statement blocks,
// we remove the variables only read in the while predicate but
// never used in the body from the input list.
- ArrayList<String> inputs = new ArrayList<>();
+ HashSet<String> inputs = new HashSet<>();
WhileStatement fstmt = (WhileStatement)_statements.get(0);
for (StatementBlock sb : fstmt.getBody())
inputs.addAll(sb.getInputstoSB());
- return inputs;
+ // Hashset ensures no duplicates in the variable list
+ return new ArrayList<>(inputs);
}
@Override
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 40962ec..5010c1c 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -329,6 +329,28 @@ public class LineageCache
return p;
}
+ //This method is for hard removal of an entry, w/o maintaining eviction
data structures
+ public static void removeEntry(LineageItem key) {
+ boolean p = _cache.containsKey(key);
+ if (!p) return;
+ synchronized(_cache) {
+ LineageCacheEntry e = getEntry(key);
+ long size = e.getSize();
+ if (e._origItem == null)
+ _cache.remove(e._key);
+
+ else {
+ LineageCacheEntry h = _cache.get(e._origItem);
//head
+ while (h != null) {
+ LineageCacheEntry tmp = h;
+ h = h._nextEntry;
+ _cache.remove(tmp._key);
+ }
+ }
+ LineageCacheEviction.updateSize(size, false);
+ }
+ }
+
public static MatrixBlock getMatrix(LineageItem key) {
LineageCacheEntry e = null;
synchronized( _cache ) {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index c64d49b..79c1e41 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -33,7 +33,7 @@ import org.apache.sysds.runtime.util.LocalFileUtils;
public class LineageCacheEviction
{
- private static long _cachesize = 0;
+ protected static long _cachesize = 0;
private static long CACHE_LIMIT; //limit in bytes
private static long _startTimestamp = 0;
protected static final Map<LineageItem, Integer> _removelist = new
HashMap<>();
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
index b781bbd..396c287 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
@@ -108,10 +108,15 @@ public class LineageDedupUtils {
LineageItem[] liinputs =
LineageItemUtils.getLineageItemInputstoSB(inputnames, ec);
// TODO: find the inputs from the ProgramBlock instead of
StatementBlock
String ph = LineageItemUtils.LPLACEHOLDER;
- for (int i=0; i<liinputs.length; i++) {
+ int i = 0;
+ for (String input : inputnames) {
+ // Skip empty variables to correctly map lineage items
to live variables
+ if (ec.getVariable(input) == null)
+ continue;
// Wrap the inputs with order-preserving placeholders.
LineageItem phInput = new
LineageItem(ph+String.valueOf(i), new LineageItem[] {liinputs[i]});
- _tmpLineage.set(inputnames.get(i), phInput);
+ _tmpLineage.set(input, phInput);
+ i++;
}
// also copy the dedupblock to trace the taken path (bitset)
_tmpLineage.setDedupBlock(ldb);
@@ -194,6 +199,11 @@ public class LineageDedupUtils {
}
root.resetVisitStatusNR();
cutAtPlaceholder(root);
+ if (!LineageCacheConfig.ReuseCacheType.isNone())
+ // These chopped DAGs can lead to incorrect
reuse.
+ // FIXME: This logic removes only the live
variables from lineage cache.
+ // Need a way to remove all entries that
are cached in this iteration.
+ LineageCache.removeEntry(root);
}
lmap.getTraces().keySet().removeAll(emptyRoots);
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/DedupReuseTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/DedupReuseTest.java
index a19eaa3..640cb74 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/DedupReuseTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/DedupReuseTest.java
@@ -44,6 +44,7 @@ public class DedupReuseTest extends AutomatedTestBase
protected static final String TEST_NAME3 = "DedupReuse3";
protected static final String TEST_NAME4 = "DedupReuse4";
protected static final String TEST_NAME5 = "DedupReuse5";
+ protected static final String TEST_NAME6 = "DedupReuse6";
protected String TEST_CLASS_DIR = TEST_DIR +
DedupReuseTest.class.getSimpleName() + "/";
@@ -54,7 +55,7 @@ public class DedupReuseTest extends AutomatedTestBase
@Override
public void setUp() {
TestUtils.clearAssertionInformation();
- for(int i=1; i<=5; i++)
+ for(int i=1; i<=6; i++)
addTestConfiguration(TEST_NAME+i, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME+i));
}
@@ -87,6 +88,12 @@ public class DedupReuseTest extends AutomatedTestBase
// Reuse an operation for each iteration of a dedup loop
testLineageTrace(TEST_NAME4);
}
+
+ @Test
+ public void testLineageTrace6() {
+ // Reuse minibatch-wise preprocessing in a mini-batch like
scenario
+ testLineageTrace(TEST_NAME6);
+ }
public void testLineageTrace(String testname) {
boolean old_simplification =
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
diff --git a/src/test/scripts/functions/lineage/DedupReuse6.dml
b/src/test/scripts/functions/lineage/DedupReuse6.dml
new file mode 100644
index 0000000..d1263cd
--- /dev/null
+++ b/src/test/scripts/functions/lineage/DedupReuse6.dml
@@ -0,0 +1,56 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+D = rand(rows=1000, cols=784, min=0, max=20, seed=42)
+bs = 128;
+ep = 5;
+iter_ep = ceil(nrow(D)/bs);
+maxiter = ep * iter_ep;
+beg = 1;
+iter = 0;
+i = 1;
+
+while (iter < maxiter) {
+ end = beg + bs - 1;
+ if (end>nrow(D))
+ end = nrow(D);
+ X = D[beg:end,]
+
+ #reusable OP across epochs
+ X = scale(X, TRUE, TRUE);
+
+ #not reusable OPs
+ X = ((X + X) * i - X) / (i+1)
+ X = ((X + X) * i - X) / (i+1)
+ X = ((X + X) * i - X) / (i+1)
+ X = ((X + X) * i - X) / (i+1)
+ X = ((X + X) * i - X) / (i+1)
+
+ iter = iter + 1;
+ if (end == nrow(D))
+ beg = 1;
+ else
+ beg = end + 1;
+ i = i + 1;
+
+}
+write(X, $1, format="text");
+