[SYSTEMML-2474] Support for list result variables in parfor

This patch introduces support for list data types as parfor result
variables. In detail, this includes (1) a generalized parfor dependency
analysis for lists, (2) a hardened parfor optimizer for list data types,
and (3) a dedicated result merge procedure for lists.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/430c04d5
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/430c04d5
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/430c04d5

Branch: refs/heads/master
Commit: 430c04d5988cb542b2037938b239f9d69706d7c5
Parents: f2c0d13
Author: Matthias Boehm <[email protected]>
Authored: Mon Jul 30 22:06:49 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon Jul 30 22:06:49 2018 -0700

----------------------------------------------------------------------
 .../sysml/parser/ParForStatementBlock.java      | 13 ++--
 .../controlprogram/ParForProgramBlock.java      | 37 +++++++++---
 .../parfor/opt/OptimizerRuleBased.java          | 22 +++----
 .../runtime/instructions/cp/ListObject.java     |  7 +++
 .../parfor/ParForDependencyAnalysisTest.java    | 10 ++--
 .../parfor/ParForListResultVarsTest.java        | 63 ++++++++++++++++++++
 src/test/scripts/functions/parfor/parfor54d.dml | 28 +++++++++
 .../functions/parfor/parfor_listResults.dml     | 33 ++++++++++
 .../functions/parfor/ZPackageSuite.java         |  1 +
 9 files changed, 184 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java 
b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
index 9c752ae..6882102 100644
--- a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
@@ -686,10 +686,10 @@ public class ParForStatementBlock extends 
ForStatementBlock
                                                for(DataIdentifier write : 
datsUpdated) {
                                                        if( !c._var.equals( 
write.getName() ) ) continue;
                                                        
-                                                       if( cdt != 
DataType.MATRIX ) {
+                                                       if( cdt != 
DataType.MATRIX && cdt != DataType.LIST ) {
                                                                //cannot infer 
type, need to exit (conservative approach)
-                                                               throw new 
LanguageException("PARFOR loop dependency analysis: "
-                                                                       + 
"cannot check for dependencies due to unknown datatype of var '"+c._var+"'.");
+                                                               throw new 
LanguageException("PARFOR loop dependency analysis: cannot check "
+                                                                       + "for 
dependencies due to unknown datatype of var '"+c._var+"': "+cdt.name()+".");
                                                        }
                                                        
                                                        DataIdentifier dat2 = 
write;
@@ -724,7 +724,8 @@ public class ParForStatementBlock extends ForStatementBlock
                                                        if( 
ABORT_ON_FIRST_DEPENDENCY )
                                                                return;
                                                }
-                                               else if( cdt == DataType.MATRIX 
&& dat2dt == DataType.MATRIX )
+                                               else if( (cdt == 
DataType.MATRIX && dat2dt == DataType.MATRIX)
+                                                       || (cdt == 
DataType.LIST && dat2dt == DataType.LIST ) )
                                                {
                                                        boolean invalid = false;
                                                        if( 
runEqualsCheck(c._dat, dat2) )
@@ -746,8 +747,8 @@ public class ParForStatementBlock extends ForStatementBlock
                                                }
                                                else { //if( 
c._dat.getDataType() == DataType.UNKNOWN )
                                                        //cannot infer type, 
need to exit (conservative approach)
-                                                       throw new 
LanguageException("PARFOR loop dependency analysis: "
-                                                               + "cannot check 
for dependencies due to unknown datatype of var '"+c._var+"'.");
+                                                       throw new 
LanguageException("PARFOR loop dependency analysis: cannot check "
+                                                               + "for 
dependencies due to unknown datatype of var '"+c._var+"': "+cdt.name()+".");
                                                }
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index ba490f3..85c4c31 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -1670,19 +1670,17 @@ public class ParForProgramBlock extends ForProgramBlock
                        for( ResultVar var : _resultVars ) //foreach non-local 
write
                        {
                                Data dat = ec.getVariable(var._name);
+                               
                                if( dat instanceof MatrixObject ) //robustness 
scalars
                                {
                                        MatrixObject out = (MatrixObject) dat;
-                                       MatrixObject[] in = new MatrixObject[ 
results.length ];
-                                       for( int i=0; i< results.length; i++ )
-                                               in[i] = (MatrixObject) 
results[i].get( var._name );
+                                       MatrixObject[] in = 
Arrays.stream(results).map(vars -> 
+                                               
vars.get(var._name)).toArray(MatrixObject[]::new);
                                        String fname = 
constructResultMergeFileName();
                                        ResultMerge rm = 
createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec);
-                                       MatrixObject outNew = null;
-                                       if( USE_PARALLEL_RESULT_MERGE )
-                                               outNew = 
rm.executeParallelMerge( _numThreads );
-                                       else
-                                               outNew = 
rm.executeSerialMerge();
+                                       MatrixObject outNew = 
USE_PARALLEL_RESULT_MERGE ?
+                                               
rm.executeParallelMerge(_numThreads) :
+                                               rm.executeSerialMerge();
                                        
                                        //cleanup existing var
                                        Data exdata = 
ec.removeVariable(var._name);
@@ -1691,10 +1689,31 @@ public class ParForProgramBlock extends ForProgramBlock
                                        
                                        //cleanup of intermediate result 
variables
                                        cleanWorkerResultVariables( ec, out, in 
);
-                                       
+
                                        //set merged result variable
                                        ec.setVariable(var._name, outNew);
                                }
+                               else if(dat instanceof ListObject) {
+                                       ListObject oldList = (ListObject) dat;
+                                       ListObject newList = new 
ListObject(oldList);
+                                       ListObject[] in = 
Arrays.stream(results).map(vars -> 
+                                               
vars.get(var._name)).toArray(ListObject[]::new);
+                                       
+                                       //merge modified list entries into 
result
+                                       for(int i=0; i<oldList.getLength(); 
i++) {
+                                               Data compare = oldList.slice(i);
+                                               for( int j=0; j<in.length; j++ 
) {
+                                                       Data tmp = 
in[j].slice(i);
+                                                       if( compare != tmp ) {
+                                                               newList.set(i, 
tmp);
+                                                               break; //inner 
for loop
+                                                       }
+                                               }
+                                       }
+                                       
+                                       //set merged result variable
+                                       ec.setVariable(var._name, newList);
+                               }
                        }
                }
                

http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index cbc8d8d..9f6db11 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -417,7 +417,8 @@ public class OptimizerRuleBased extends Optimizer
                                double mem = getMemoryEstimate(c, vars);
                                if( dpf != PartitionFormat.NONE 
                                        && dpf._dpf != 
PDataPartitionFormat.BLOCK_WISE_M_N
-                                       && (constrained || (mem > _lm/2 && mem 
> _rm/2)) ) {
+                                       && (constrained || (mem > _lm/2 && mem 
> _rm/2))
+                                       && !vars.get(c).getDataType().isList() 
) {
                                        cand2.put( c, dpf );
                                }
                        }
@@ -2039,7 +2040,6 @@ public class OptimizerRuleBased extends Optimizer
                        if( dat instanceof MatrixObject && 
((MatrixObject)dat).getNnz()!=0     //subject to result merge with compare
                                && n.hasOnlySimpleChilds()                      
                   //guaranteed no conditional indexing 
                                && rContainsResultFullReplace(n, rvar._name, 
itervar, (MatrixObject)dat) //guaranteed full matrix replace 
-                               //&& 
!pfsb.variablesRead().containsVariable(rvar)                  //never read 
variable in loop body
                                && !rIsReadInRightIndexing(n, rvar._name)       
                   //never read variable in loop body
                                && 
((MatrixObject)dat).getNumRows()<=Integer.MAX_VALUE
                                && 
((MatrixObject)dat).getNumColumns()<=Integer.MAX_VALUE )
@@ -2334,15 +2334,17 @@ public class OptimizerRuleBased extends Optimizer
                                LeftIndexingOp hop = (LeftIndexingOp) 
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
                                //check agains set of varname
                                String varName = 
hop.getInput().get(0).getName();
-                               if( ResultVar.contains(resultVars, varName) && 
vars.keySet().contains(varName) )
-                               {
+                               if( ResultVar.contains(resultVars, varName) && 
vars.keySet().contains(varName) ) {
+                                       Data dat = 
vars.get(hop.getInput().get(0).getName());
                                        //dims of result vars must be known at 
this point in time
-                                       MatrixObject mo = (MatrixObject) 
vars.get( hop.getInput().get(0).getName() );
-                                       long rows = mo.getNumRows();
-                                       long cols = mo.getNumColumns();
-                                       double memBudget = inLocal ? 
OptimizerUtils.getLocalMemBudget() : 
-                                                                        
OptimizerUtils.getRemoteMemBudgetMap();
-                                       ret &= isInMemoryResultMerge(rows, 
cols, memBudget);
+                                       if( dat instanceof MatrixObject ) {
+                                               MatrixObject mo = 
(MatrixObject) dat;
+                                               long rows = mo.getNumRows();
+                                               long cols = mo.getNumColumns();
+                                               double memBudget = inLocal ? 
OptimizerUtils.getLocalMemBudget() : 
+                                                                               
 OptimizerUtils.getRemoteMemBudgetMap();
+                                               ret &= 
isInMemoryResultMerge(rows, cols, memBudget);
+                                       }
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java
index d41dfc5..1468c76 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java
@@ -47,6 +47,13 @@ public class ListObject extends Data {
                _names = names;
        }
        
+       public ListObject(ListObject that) {
+               this(new ArrayList<>(that._data), (that._names != null) ?
+                       new ArrayList<>(that._names) : null);
+               if( that._dataState != null )
+                       _dataState = Arrays.copyOf(that._dataState, 
getLength());
+       }
+       
        public void setStatus(boolean[] status) {
                _dataState = status;
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java
index 97f253b..50c2ef2 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java
@@ -67,7 +67,7 @@ import org.junit.Test;
  * * accumulators
  *    53a: no, 53b dep, 53c dep, 53d dep, 53e dep
  * * lists
- *    54a: no, 54b: dep, 54c: dep
+ *    54a: no, 54b: no, 54c: dep, 54d: dep
  */
 public class ParForDependencyAnalysisTest extends AutomatedTestBase
 {
@@ -76,9 +76,7 @@ public class ParForDependencyAnalysisTest extends 
AutomatedTestBase
        private static final String TEST_CLASS_DIR = TEST_DIR + 
ParForDependencyAnalysisTest.class.getSimpleName() + "/";
        
        @Override
-       public void setUp() {
-               
-       }
+       public void setUp() {}
        
        @Test
        public void testDependencyAnalysis1() { runTest("parfor1.dml", false); }
@@ -322,11 +320,13 @@ public class ParForDependencyAnalysisTest extends 
AutomatedTestBase
        public void testDependencyAnalysis54a() { runTest("parfor54a.dml", 
false); }
        
        @Test
-       public void testDependencyAnalysis54b() { runTest("parfor54b.dml", 
true); }
+       public void testDependencyAnalysis54b() { runTest("parfor54b.dml", 
false); }
        
        @Test
        public void testDependencyAnalysis54c() { runTest("parfor54c.dml", 
true); }
        
+       @Test
+       public void testDependencyAnalysis54d() { runTest("parfor54d.dml", 
true); }
        
        private void runTest( String scriptFilename, boolean expectedException 
) {
                boolean raisedException = false;

http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForListResultVarsTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForListResultVarsTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForListResultVarsTest.java
new file mode 100644
index 0000000..681ba60
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForListResultVarsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.parfor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+
+public class ParForListResultVarsTest extends AutomatedTestBase 
+{
+       private final static String TEST_DIR = "functions/parfor/";
+       private final static String TEST_NAME1 = "parfor_listResults";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ParForListResultVarsTest.class.getSimpleName() + "/";
+       
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME1, 
+                       new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new 
String[] { "R" }) );
+       }
+
+       @Test
+       public void testParForListResult1a() {
+               runListResultVarTest(TEST_NAME1, 2, 1);
+       }
+       
+       @Test
+       public void testParForListResult1b() {
+               runListResultVarTest(TEST_NAME1, 35, 10);
+       }
+       
+       private void runListResultVarTest(String testName, int rows, int cols) {
+               loadTestConfiguration(getTestConfiguration(testName));
+               
+               String HOME = SCRIPT_DIR + TEST_DIR;
+               fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+               programArgs = new String[]{"-explain","-args",
+                       String.valueOf(rows), String.valueOf(cols), output("R") 
};
+
+               runTest(true, false, null, -1);
+               Assert.assertEquals(new Double(7),
+                       readDMLMatrixFromHDFS("R").get(new CellIndex(1,1)));
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test/scripts/functions/parfor/parfor54d.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor54d.dml 
b/src/test/scripts/functions/parfor/parfor54d.dml
new file mode 100644
index 0000000..c6e4d15
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor54d.dml
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A = matrix(7, rows=2, cols=2);
+B = matrix(3, rows=2, cols=2);
+C = list(A, B, A);
+parfor( i in 2:3 )
+  C[i] = as.matrix(C[i-1])+7;
+print(sum(as.matrix(C[1])));

http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test/scripts/functions/parfor/parfor_listResults.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/parfor/parfor_listResults.dml 
b/src/test/scripts/functions/parfor/parfor_listResults.dml
new file mode 100644
index 0000000..285936b
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_listResults.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = matrix(0, 1, $1*$2);
+L = list(A+1,A+2,A+3,A+4,A+5,A+6,A+7);
+
+parfor(i in 1:length(L))
+  L[i] = rowMeans(as.matrix(L[i]));
+
+R1 = matrix(0,0,1)
+for(i in 1:length(L))
+  R1 = rbind(R1, as.matrix(L[i]));
+
+R = as.matrix(sum(R1==seq(1,7)));
+write(R, $3);

http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
index 8844be2..7124678 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java
@@ -34,6 +34,7 @@ import org.junit.runners.Suite;
        ParForDataPartitionExecuteTest.class,
        ParForDataPartitionLeftIndexingTest.class,
        ParForDependencyAnalysisTest.class,
+       ParForListResultVarsTest.class,
        ParForFunctionSerializationTest.class,
        ParForMultipleDataPartitioningTest.class,
        ParForNaNResultMergeTest.class,

Reply via email to