[SYSTEMML-2474] Support for list result variables in parfor This patch introduces support for list data types as parfor result variables. In detail, this includes (1) a generalized parfor dependency analysis for lists, (2) a hardened parfor optimizer for list data types, and (3) a dedicated result merge procedure for lists.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/430c04d5 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/430c04d5 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/430c04d5 Branch: refs/heads/master Commit: 430c04d5988cb542b2037938b239f9d69706d7c5 Parents: f2c0d13 Author: Matthias Boehm <[email protected]> Authored: Mon Jul 30 22:06:49 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Jul 30 22:06:49 2018 -0700 ---------------------------------------------------------------------- .../sysml/parser/ParForStatementBlock.java | 13 ++-- .../controlprogram/ParForProgramBlock.java | 37 +++++++++--- .../parfor/opt/OptimizerRuleBased.java | 22 +++---- .../runtime/instructions/cp/ListObject.java | 7 +++ .../parfor/ParForDependencyAnalysisTest.java | 10 ++-- .../parfor/ParForListResultVarsTest.java | 63 ++++++++++++++++++++ src/test/scripts/functions/parfor/parfor54d.dml | 28 +++++++++ .../functions/parfor/parfor_listResults.dml | 33 ++++++++++ .../functions/parfor/ZPackageSuite.java | 1 + 9 files changed, 184 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java index 9c752ae..6882102 100644 --- a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java +++ b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java @@ -686,10 +686,10 @@ public class ParForStatementBlock extends ForStatementBlock for(DataIdentifier write : datsUpdated) { if( !c._var.equals( write.getName() ) ) continue; - if( cdt != DataType.MATRIX ) { + if( cdt != DataType.MATRIX && cdt != DataType.LIST ) { //cannot infer type, need to exit (conservative approach) - throw new LanguageException("PARFOR loop dependency analysis: " - + "cannot check for dependencies due to unknown datatype of var '"+c._var+"'."); + throw new LanguageException("PARFOR loop dependency analysis: cannot check " + + "for dependencies due to unknown datatype of var '"+c._var+"': "+cdt.name()+"."); } DataIdentifier dat2 = write; @@ -724,7 +724,8 @@ public class ParForStatementBlock extends ForStatementBlock if( ABORT_ON_FIRST_DEPENDENCY ) return; } - else if( cdt == DataType.MATRIX && dat2dt == DataType.MATRIX ) + else if( (cdt == DataType.MATRIX && dat2dt == DataType.MATRIX) + || (cdt == DataType.LIST && dat2dt == DataType.LIST ) ) { boolean invalid = false; if( runEqualsCheck(c._dat, dat2) ) @@ -746,8 +747,8 @@ public class ParForStatementBlock extends ForStatementBlock } else { //if( c._dat.getDataType() == DataType.UNKNOWN ) //cannot infer type, need to exit (conservative approach) - throw new LanguageException("PARFOR loop dependency analysis: " - + "cannot check for dependencies due to unknown datatype of var '"+c._var+"'."); + throw new LanguageException("PARFOR loop dependency analysis: cannot check " + + "for dependencies due to unknown datatype of var '"+c._var+"': "+cdt.name()+"."); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index ba490f3..85c4c31 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -1670,19 +1670,17 @@ public class ParForProgramBlock extends ForProgramBlock for( ResultVar var : _resultVars ) //foreach non-local write { Data dat = ec.getVariable(var._name); + if( dat instanceof MatrixObject ) //robustness scalars { MatrixObject out = (MatrixObject) dat; - MatrixObject[] in = new MatrixObject[ results.length ]; - for( int i=0; i< results.length; i++ ) - in[i] = (MatrixObject) results[i].get( var._name ); + MatrixObject[] in = Arrays.stream(results).map(vars -> + vars.get(var._name)).toArray(MatrixObject[]::new); String fname = constructResultMergeFileName(); ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, var._isAccum, ec); - MatrixObject outNew = null; - if( USE_PARALLEL_RESULT_MERGE ) - outNew = rm.executeParallelMerge( _numThreads ); - else - outNew = rm.executeSerialMerge(); + MatrixObject outNew = USE_PARALLEL_RESULT_MERGE ? + rm.executeParallelMerge(_numThreads) : + rm.executeSerialMerge(); //cleanup existing var Data exdata = ec.removeVariable(var._name); @@ -1691,10 +1689,31 @@ public class ParForProgramBlock extends ForProgramBlock //cleanup of intermediate result variables cleanWorkerResultVariables( ec, out, in ); - + //set merged result variable ec.setVariable(var._name, outNew); } + else if(dat instanceof ListObject) { + ListObject oldList = (ListObject) dat; + ListObject newList = new ListObject(oldList); + ListObject[] in = Arrays.stream(results).map(vars -> + vars.get(var._name)).toArray(ListObject[]::new); + + //merge modified list entries into result + for(int i=0; i<oldList.getLength(); i++) { + Data compare = oldList.slice(i); + for( int j=0; j<in.length; j++ ) { + Data tmp = in[j].slice(i); + if( compare != tmp ) { + newList.set(i, tmp); + break; //inner for loop + } + } + } + + //set merged result variable + ec.setVariable(var._name, newList); + } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index cbc8d8d..9f6db11 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -417,7 +417,8 @@ public class OptimizerRuleBased extends Optimizer double mem = getMemoryEstimate(c, vars); if( dpf != PartitionFormat.NONE && dpf._dpf != PDataPartitionFormat.BLOCK_WISE_M_N - && (constrained || (mem > _lm/2 && mem > _rm/2)) ) { + && (constrained || (mem > _lm/2 && mem > _rm/2)) + && !vars.get(c).getDataType().isList() ) { cand2.put( c, dpf ); } } @@ -2039,7 +2040,6 @@ public class OptimizerRuleBased extends Optimizer if( dat instanceof MatrixObject && ((MatrixObject)dat).getNnz()!=0 //subject to result merge with compare && n.hasOnlySimpleChilds() //guaranteed no conditional indexing && rContainsResultFullReplace(n, rvar._name, itervar, (MatrixObject)dat) //guaranteed full matrix replace - //&& !pfsb.variablesRead().containsVariable(rvar) //never read variable in loop body && !rIsReadInRightIndexing(n, rvar._name) //never read variable in loop body && ((MatrixObject)dat).getNumRows()<=Integer.MAX_VALUE && ((MatrixObject)dat).getNumColumns()<=Integer.MAX_VALUE ) @@ -2334,15 +2334,17 @@ public class OptimizerRuleBased extends Optimizer LeftIndexingOp hop = (LeftIndexingOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID()); //check agains set of varname String varName = hop.getInput().get(0).getName(); - if( ResultVar.contains(resultVars, varName) && vars.keySet().contains(varName) ) - { + if( ResultVar.contains(resultVars, varName) && vars.keySet().contains(varName) ) { + Data dat = vars.get(hop.getInput().get(0).getName()); //dims of result vars must be known at this point in time - MatrixObject mo = (MatrixObject) vars.get( hop.getInput().get(0).getName() ); - long rows = mo.getNumRows(); - long cols = mo.getNumColumns(); - double memBudget = inLocal ? OptimizerUtils.getLocalMemBudget() : - OptimizerUtils.getRemoteMemBudgetMap(); - ret &= isInMemoryResultMerge(rows, cols, memBudget); + if( dat instanceof MatrixObject ) { + MatrixObject mo = (MatrixObject) dat; + long rows = mo.getNumRows(); + long cols = mo.getNumColumns(); + double memBudget = inLocal ? OptimizerUtils.getLocalMemBudget() : + OptimizerUtils.getRemoteMemBudgetMap(); + ret &= isInMemoryResultMerge(rows, cols, memBudget); + } } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java index d41dfc5..1468c76 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ListObject.java @@ -47,6 +47,13 @@ public class ListObject extends Data { _names = names; } + public ListObject(ListObject that) { + this(new ArrayList<>(that._data), (that._names != null) ? + new ArrayList<>(that._names) : null); + if( that._dataState != null ) + _dataState = Arrays.copyOf(that._dataState, getLength()); + } + public void setStatus(boolean[] status) { _dataState = status; } http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java index 97f253b..50c2ef2 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java @@ -67,7 +67,7 @@ import org.junit.Test; * * accumulators * 53a: no, 53b dep, 53c dep, 53d dep, 53e dep * * lists - * 54a: no, 54b: dep, 54c: dep + * 54a: no, 54b: no, 54c: dep, 54d: dep */ public class ParForDependencyAnalysisTest extends AutomatedTestBase { @@ -76,9 +76,7 @@ public class ParForDependencyAnalysisTest extends AutomatedTestBase private static final String TEST_CLASS_DIR = TEST_DIR + ParForDependencyAnalysisTest.class.getSimpleName() + "/"; @Override - public void setUp() { - - } + public void setUp() {} @Test public void testDependencyAnalysis1() { runTest("parfor1.dml", false); } @@ -322,11 +320,13 @@ public class ParForDependencyAnalysisTest extends AutomatedTestBase public void testDependencyAnalysis54a() { runTest("parfor54a.dml", false); } @Test - public void testDependencyAnalysis54b() { runTest("parfor54b.dml", true); } + public void testDependencyAnalysis54b() { runTest("parfor54b.dml", false); } @Test public void testDependencyAnalysis54c() { runTest("parfor54c.dml", true); } + @Test + public void testDependencyAnalysis54d() { runTest("parfor54d.dml", true); } private void runTest( String scriptFilename, boolean expectedException ) { boolean raisedException = false; http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForListResultVarsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForListResultVarsTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForListResultVarsTest.java new file mode 100644 index 0000000..681ba60 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForListResultVarsTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.parfor; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; + +public class ParForListResultVarsTest extends AutomatedTestBase +{ + private final static String TEST_DIR = "functions/parfor/"; + private final static String TEST_NAME1 = "parfor_listResults"; + private final static String TEST_CLASS_DIR = TEST_DIR + ParForListResultVarsTest.class.getSimpleName() + "/"; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME1, + new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) ); + } + + @Test + public void testParForListResult1a() { + runListResultVarTest(TEST_NAME1, 2, 1); + } + + @Test + public void testParForListResult1b() { + runListResultVarTest(TEST_NAME1, 35, 10); + } + + private void runListResultVarTest(String testName, int rows, int cols) { + loadTestConfiguration(getTestConfiguration(testName)); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[]{"-explain","-args", + String.valueOf(rows), String.valueOf(cols), output("R") }; + + runTest(true, false, null, -1); + Assert.assertEquals(new Double(7), + readDMLMatrixFromHDFS("R").get(new CellIndex(1,1))); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test/scripts/functions/parfor/parfor54d.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor54d.dml b/src/test/scripts/functions/parfor/parfor54d.dml new file mode 100644 index 0000000..c6e4d15 --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor54d.dml @@ -0,0 +1,28 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +A = matrix(7, rows=2, cols=2); +B = matrix(3, rows=2, cols=2); +C = list(A, B, A); +parfor( i in 2:3 ) + C[i] = as.matrix(C[i-1])+7; +print(sum(as.matrix(C[1]))); http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test/scripts/functions/parfor/parfor_listResults.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_listResults.dml b/src/test/scripts/functions/parfor/parfor_listResults.dml new file mode 100644 index 0000000..285936b --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_listResults.dml @@ -0,0 +1,33 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = matrix(0, 1, $1*$2); +L = list(A+1,A+2,A+3,A+4,A+5,A+6,A+7); + +parfor(i in 1:length(L)) + L[i] = rowMeans(as.matrix(L[i])); + +R1 = matrix(0,0,1) +for(i in 1:length(L)) + R1 = rbind(R1, as.matrix(L[i])); + +R = as.matrix(sum(R1==seq(1,7))); +write(R, $3); http://git-wip-us.apache.org/repos/asf/systemml/blob/430c04d5/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java index 8844be2..7124678 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/parfor/ZPackageSuite.java @@ -34,6 +34,7 @@ import org.junit.runners.Suite; ParForDataPartitionExecuteTest.class, ParForDataPartitionLeftIndexingTest.class, ParForDependencyAnalysisTest.class, + ParForListResultVarsTest.class, ParForFunctionSerializationTest.class, ParForMultipleDataPartitioningTest.class, ParForNaNResultMergeTest.class,
