This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 03c546fe33 [SYSTEMDS-3470] Generalize selecting reusable Spark 
instructions
03c546fe33 is described below

commit 03c546fe3392759b18529db0ccb3d26328b02e0f
Author: Arnab Phani <[email protected]>
AuthorDate: Tue Nov 29 00:05:01 2022 +0100

    [SYSTEMDS-3470] Generalize selecting reusable Spark instructions
    
    This patch allows putting any Matrix Object in the lineage cache
    which does not have a valid RDD. This change makes it easier to
    separate Spark instructions which returns intermediate back to local.
    
    Closes #1742
---
 .../sysds/runtime/controlprogram/caching/CacheableData.java |  4 ++++
 .../java/org/apache/sysds/runtime/lineage/LineageCache.java |  6 ++++++
 .../apache/sysds/runtime/lineage/LineageCacheConfig.java    |  2 +-
 .../sysds/test/functions/async/LineageReuseSparkTest.java   | 13 ++++++++++---
 .../test/functions/builtin/part2/BuiltinNaLocfTest.java     |  2 ++
 5 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 57cd49791b..9d195ea7fd 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -442,6 +442,10 @@ public abstract class CacheableData<T extends 
CacheBlock<?>> extends Data
                if( _rddHandle != null )
                        rdd.setBackReference(this);
        }
+
+       public boolean hasRDDHandle() {
+               return _rddHandle != null && _rddHandle.hasBackReference();
+       }
        
        public BroadcastObject<T> getBroadcastHandle() {
                return _bcHandle;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 4c79b885b7..ecd734c588 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -579,6 +579,12 @@ public class LineageCache
                                        continue;
                                }
 
+                               if (data instanceof MatrixObject && 
((MatrixObject) data).hasRDDHandle()) {
+                                       // Avoid triggering pre-matured Spark 
instruction chains
+                                       removePlaceholder(item);
+                                       continue;
+                               }
+
                                if (LineageCacheConfig.isOutputFederated(inst, 
data)) {
                                        // Do not cache federated outputs (in 
the coordinator)
                                        // Cannot skip putting the placeholder 
as the above is only known after execution
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 4259481444..5a7c46dfe7 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -206,7 +206,7 @@ public class LineageCacheConfig
                boolean insttype = (inst instanceof ComputationCPInstruction 
                        || inst instanceof ComputationFEDInstruction
                        || inst instanceof GPUInstruction
-                       || (inst instanceof ComputationSPInstruction && 
isRightSparkOp(inst)))
+                       || inst instanceof ComputationSPInstruction)
                        && !(inst instanceof ListIndexingCPInstruction);
                boolean rightop = (ArrayUtils.contains(REUSE_OPCODES, 
inst.getOpcode())
                        || (inst.getOpcode().equals("append") && 
isVectorAppend(inst, ec))
diff --git 
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
 
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index 83992c5772..e958093122 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -51,15 +51,22 @@ public class LineageReuseSparkTest extends 
AutomatedTestBase {
        }
 
        @Test
-       public void testlmds() {
-               runTest(TEST_NAME+"1");
+       public void testlmdsHB() {
+               runTest(TEST_NAME+"1", ExecMode.HYBRID);
        }
 
-       public void runTest(String testname) {
+       @Test
+       public void testlmdsSP() {
+               // Only reuse the actions
+               runTest(TEST_NAME+"1", ExecMode.SPARK);
+       }
+
+       public void runTest(String testname, ExecMode execMode) {
                boolean old_simplification = 
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
                boolean old_sum_product = 
OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
                boolean old_trans_exec_type = 
OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE;
                ExecMode oldPlatform = setExecMode(ExecMode.HYBRID);
+               rtplatform = execMode;
 
                long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
                long mem = 1024*1024*8;
diff --git 
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinNaLocfTest.java
 
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinNaLocfTest.java
index 4d9ddac658..64999e46ae 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinNaLocfTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinNaLocfTest.java
@@ -22,6 +22,7 @@ package org.apache.sysds.test.functions.builtin.part2;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.matrix.data.MatrixValue;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -105,6 +106,7 @@ public class BuiltinNaLocfTest extends AutomatedTestBase {
                        double[][] A = getRandomMatrix(rows, cols, -10, 10, 
0.6, 7);
                        writeInputMatrixWithMTD("A", A, true);
 
+                       Lineage.resetInternalState();
                        runTest(true, false, null, -1);
                        runRScript(true);
                        //compare matrices

Reply via email to