This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 68e5e29  [SYSTEMDS-2583] Add test for a mini-batch scenario with dedup
68e5e29 is described below

commit 68e5e298c73d1013c528d76c789dba77fd106eb8
Author: arnabp <[email protected]>
AuthorDate: Sun Feb 28 14:48:53 2021 +0100

    [SYSTEMDS-2583] Add test for a mini-batch scenario with dedup
    
    This patch fixes a bug in placeholder handling in lineage deuplication.
    In addition, it adds a test to reuse preprocessing in a mini-batch
    like scenario with deduplication.
---
 .../apache/sysds/parser/WhileStatementBlock.java   |  6 ++-
 .../apache/sysds/runtime/lineage/LineageCache.java | 22 +++++++++
 .../runtime/lineage/LineageCacheEviction.java      |  2 +-
 .../sysds/runtime/lineage/LineageDedupUtils.java   | 14 +++++-
 .../test/functions/lineage/DedupReuseTest.java     |  9 +++-
 src/test/scripts/functions/lineage/DedupReuse6.dml | 56 ++++++++++++++++++++++
 6 files changed, 103 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/sysds/parser/WhileStatementBlock.java 
b/src/main/java/org/apache/sysds/parser/WhileStatementBlock.java
index 89f9261..7a09242 100644
--- a/src/main/java/org/apache/sysds/parser/WhileStatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/WhileStatementBlock.java
@@ -21,6 +21,7 @@ package org.apache.sysds.parser;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.Hop;
@@ -259,11 +260,12 @@ public class WhileStatementBlock extends StatementBlock
                // By calling getInputstoSB on all the child statement blocks,
                // we remove the variables only read in the while predicate but
                // never used in the body from the input list.
-               ArrayList<String> inputs = new ArrayList<>();
+               HashSet<String> inputs = new HashSet<>();
                WhileStatement fstmt = (WhileStatement)_statements.get(0);
                for (StatementBlock sb : fstmt.getBody())
                        inputs.addAll(sb.getInputstoSB());
-               return inputs;
+               // Hashset ensures no duplicates in the variable list
+               return new ArrayList<>(inputs);
        }
        
        @Override
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 40962ec..5010c1c 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -329,6 +329,28 @@ public class LineageCache
                return p;
        }
        
+       //This method is for hard removal of an entry, w/o maintaining eviction 
data structures
+       public static void removeEntry(LineageItem key) {
+               boolean p = _cache.containsKey(key);
+               if (!p) return;
+               synchronized(_cache) {
+                       LineageCacheEntry e = getEntry(key);
+                       long size = e.getSize();
+                       if (e._origItem == null)
+                               _cache.remove(e._key);
+
+                       else {
+                               LineageCacheEntry h = _cache.get(e._origItem); 
//head
+                               while (h != null) {
+                                       LineageCacheEntry tmp = h;
+                                       h = h._nextEntry;
+                                       _cache.remove(tmp._key);
+                               }
+                       }
+                       LineageCacheEviction.updateSize(size, false);
+               }
+       }
+       
        public static MatrixBlock getMatrix(LineageItem key) {
                LineageCacheEntry e = null;
                synchronized( _cache ) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index c64d49b..79c1e41 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -33,7 +33,7 @@ import org.apache.sysds.runtime.util.LocalFileUtils;
 
 public class LineageCacheEviction
 {
-       private static long _cachesize = 0;
+       protected static long _cachesize = 0;
        private static long CACHE_LIMIT; //limit in bytes
        private static long _startTimestamp = 0;
        protected static final Map<LineageItem, Integer> _removelist = new 
HashMap<>();
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
index b781bbd..396c287 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
@@ -108,10 +108,15 @@ public class LineageDedupUtils {
                LineageItem[] liinputs = 
LineageItemUtils.getLineageItemInputstoSB(inputnames, ec);
                // TODO: find the inputs from the ProgramBlock instead of 
StatementBlock
                String ph = LineageItemUtils.LPLACEHOLDER;
-               for (int i=0; i<liinputs.length; i++) {
+               int i = 0;
+               for (String input : inputnames) {
+                       // Skip empty variables to correctly map lineage items 
to live variables
+                       if (ec.getVariable(input) == null)
+                               continue;
                        // Wrap the inputs with order-preserving placeholders.
                        LineageItem phInput = new 
LineageItem(ph+String.valueOf(i), new LineageItem[] {liinputs[i]});
-                       _tmpLineage.set(inputnames.get(i), phInput);
+                       _tmpLineage.set(input, phInput);
+                       i++;
                }
                // also copy the dedupblock to trace the taken path (bitset)
                _tmpLineage.setDedupBlock(ldb);
@@ -194,6 +199,11 @@ public class LineageDedupUtils {
                        }
                        root.resetVisitStatusNR();
                        cutAtPlaceholder(root);
+                       if (!LineageCacheConfig.ReuseCacheType.isNone())
+                               // These chopped DAGs can lead to incorrect 
reuse.
+                               // FIXME: This logic removes only the live 
variables from lineage cache. 
+                               //        Need a way to remove all entries that 
are cached in this iteration.
+                               LineageCache.removeEntry(root);
                }
                lmap.getTraces().keySet().removeAll(emptyRoots);
        }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/DedupReuseTest.java 
b/src/test/java/org/apache/sysds/test/functions/lineage/DedupReuseTest.java
index a19eaa3..640cb74 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/DedupReuseTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/DedupReuseTest.java
@@ -44,6 +44,7 @@ public class DedupReuseTest extends AutomatedTestBase
        protected static final String TEST_NAME3 = "DedupReuse3"; 
        protected static final String TEST_NAME4 = "DedupReuse4"; 
        protected static final String TEST_NAME5 = "DedupReuse5"; 
+       protected static final String TEST_NAME6 = "DedupReuse6"; 
 
        protected String TEST_CLASS_DIR = TEST_DIR + 
DedupReuseTest.class.getSimpleName() + "/";
        
@@ -54,7 +55,7 @@ public class DedupReuseTest extends AutomatedTestBase
        @Override
        public void setUp() {
                TestUtils.clearAssertionInformation();
-               for(int i=1; i<=5; i++)
+               for(int i=1; i<=6; i++)
                        addTestConfiguration(TEST_NAME+i, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME+i));
        }
        
@@ -87,6 +88,12 @@ public class DedupReuseTest extends AutomatedTestBase
                // Reuse an operation for each iteration of a dedup loop
                testLineageTrace(TEST_NAME4);
        }
+
+       @Test
+       public void testLineageTrace6() {
+               // Reuse minibatch-wise preprocessing in a mini-batch like 
scenario
+               testLineageTrace(TEST_NAME6);
+       }
        
        public void testLineageTrace(String testname) {
                boolean old_simplification = 
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
diff --git a/src/test/scripts/functions/lineage/DedupReuse6.dml 
b/src/test/scripts/functions/lineage/DedupReuse6.dml
new file mode 100644
index 0000000..d1263cd
--- /dev/null
+++ b/src/test/scripts/functions/lineage/DedupReuse6.dml
@@ -0,0 +1,56 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+D = rand(rows=1000, cols=784, min=0, max=20, seed=42)
+bs = 128;
+ep = 5;
+iter_ep = ceil(nrow(D)/bs);
+maxiter = ep * iter_ep;
+beg = 1;
+iter = 0;
+i = 1;
+
+while (iter < maxiter) {
+  end = beg + bs - 1;
+  if (end>nrow(D))
+    end = nrow(D);
+  X = D[beg:end,]
+
+  #reusable OP across epochs
+  X = scale(X, TRUE, TRUE);
+
+  #not reusable OPs
+  X = ((X + X) * i - X) / (i+1)
+  X = ((X + X) * i - X) / (i+1)
+  X = ((X + X) * i - X) / (i+1)
+  X = ((X + X) * i - X) / (i+1)
+  X = ((X + X) * i - X) / (i+1)
+
+  iter = iter + 1;
+  if (end == nrow(D))
+    beg = 1;
+  else
+    beg = end + 1;
+  i = i + 1;
+
+}
+write(X, $1, format="text");
+

Reply via email to