[SYSTEMML-2096] Extended spark/mr parfor result merge w/ accumulators This patch completes the runtime integration of parfor result merge with accumulators (for the += operator) by all distributed MR and Spark operations as well as related tests.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2ef6342f Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2ef6342f Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2ef6342f Branch: refs/heads/master Commit: 2ef6342fa013a64cdc0a5d0aa667e60aeeecaa84 Parents: 4749851 Author: Matthias Boehm <[email protected]> Authored: Sat Jan 27 22:50:41 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Jan 27 22:50:41 2018 -0800 ---------------------------------------------------------------------- .../controlprogram/parfor/ResultMerge.java | 7 ++- .../parfor/ResultMergeLocalAutomatic.java | 2 + .../parfor/ResultMergeLocalFile.java | 14 +++--- .../parfor/ResultMergeLocalMemory.java | 2 + .../parfor/ResultMergeRemoteMR.java | 8 ++-- .../parfor/ResultMergeRemoteReducer.java | 16 ++++--- .../parfor/ResultMergeRemoteSpark.java | 30 ++++++------- .../parfor/ResultMergeRemoteSparkWCompare.java | 4 ++ .../matrix/mapred/MRJobConfiguration.java | 11 +++-- .../ParForAccumulatorResultMergeTest.java | 47 ++++++++++++++++++-- .../functions/parfor/parfor_accumulator.R | 39 ++++++++++++++++ .../functions/parfor/parfor_accumulator1.R | 39 ---------------- .../functions/parfor/parfor_accumulator2.dml | 34 ++++++++++++++ .../functions/parfor/parfor_accumulator3.dml | 34 ++++++++++++++ 14 files changed, 207 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java index 5e814fa..7d43475 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java @@ -19,6 +19,7 @@ package org.apache.sysml.runtime.controlprogram.parfor; +import java.io.Serializable; import java.util.List; import org.apache.commons.logging.Log; @@ -37,8 +38,12 @@ import org.apache.sysml.runtime.matrix.operators.BinaryOperator; * These properties allow us to realize result merging in parallel without any synchronization. * */ -public abstract class ResultMerge +public abstract class ResultMerge implements Serializable { + //note: this class needs to be serializable to ensure that all attributes of + //ResultMergeRemoteSparkWCompare are included in the task closure + private static final long serialVersionUID = 2620430969346516677L; + protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName()); protected static final String NAME_SUFFIX = "_rm"; protected static final BinaryOperator PLUS = InstructionUtils.parseBinaryOperator("+"); http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java index e9d178a..135cf9c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java @@ -29,6 +29,8 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; public class ResultMergeLocalAutomatic extends ResultMerge { + private static final long serialVersionUID = 1600893100602101732L; + private ResultMerge _rm = null; public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java index 049dbf1..6f987b2 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -71,6 +71,8 @@ import org.apache.sysml.runtime.util.MapReduceTool; */ public class ResultMergeLocalFile extends ResultMerge { + private static final long serialVersionUID = -6905893742840020489L; + //NOTE: if we allow simple copies, this might result in a scattered file and many MR tasks for subsequent jobs public static final boolean ALLOW_COPY_CELLFILES = false; @@ -687,7 +689,7 @@ public class ResultMergeLocalFile extends ResultMerge } //sort sparse due to append-only - if( appendOnly ) + if( appendOnly && !_isAccum ) mb.sortSparseRows(); //change sparsity if required after @@ -709,7 +711,7 @@ public class ResultMergeLocalFile extends ResultMerge } //sort sparse due to append-only - if( appendOnly ) + if( appendOnly && _isAccum ) mb.sortSparseRows(); //change sparsity if required after @@ -779,7 +781,7 @@ public class ResultMergeLocalFile extends ResultMerge } //sort sparse and exam sparsity due to append-only - if( appendOnly ) + if( appendOnly && !_isAccum ) mb.sortSparseRows(); //change sparsity if required after @@ -801,7 +803,7 @@ public class ResultMergeLocalFile extends ResultMerge } //sort sparse due to append-only - if( appendOnly ) + if( appendOnly && !_isAccum ) mb.sortSparseRows(); //change sparsity if required after @@ -903,7 +905,7 @@ public class ResultMergeLocalFile extends ResultMerge } //sort sparse due to append-only - if( appendOnly ) + if( appendOnly && _isAccum ) mb.sortSparseRows(); //change sparsity if required after @@ -925,7 +927,7 @@ public class ResultMergeLocalFile extends ResultMerge } //sort sparse due to append-only - if( appendOnly ) + if( appendOnly && _isAccum ) mb.sortSparseRows(); //change sparsity if required after http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java index a59e81c..b2f5907 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java @@ -42,6 +42,8 @@ import org.apache.sysml.runtime.util.DataConverter; */ public class ResultMergeLocalMemory extends ResultMerge { + private static final long serialVersionUID = -3543612508601511701L; + //internal comparison matrix private DenseBlock _compare = null; http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java index 4b1bb0e..c20f0db 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java @@ -58,7 +58,8 @@ import org.apache.sysml.utils.Statistics; * */ public class ResultMergeRemoteMR extends ResultMerge -{ +{ + private static final long serialVersionUID = 575681838941682037L; public static final byte COMPARE_TAG = 'c'; public static final byte DATA_TAG = 'd'; @@ -185,11 +186,12 @@ public class ResultMergeRemoteMR extends ResultMerge if( withCompare ) { FileSystem fs = IOUtilFunctions.getFileSystem(pathNew, job); pathCompare = new Path(fname).makeQualified(fs); - MRJobConfiguration.setResultMergeInfo(job, pathCompare.toString(), ii, + MRJobConfiguration.setResultMergeInfo(job, pathCompare.toString(), _isAccum, ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, brlen, bclen); } else - MRJobConfiguration.setResultMergeInfo(job, "null", ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, bclen, bclen); + MRJobConfiguration.setResultMergeInfo(job, "null", _isAccum, ii, + LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, bclen, bclen); //set mappers, reducers, combiners http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java index e12a052..6fa6534 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java @@ -74,18 +74,17 @@ public class ResultMergeRemoteReducer String compareFname = MRJobConfiguration.getResultMergeInfoCompareFilename(job); //determine compare required - boolean requiresCompare = false; - if( !compareFname.equals("null") ) - requiresCompare = true; + boolean requiresCompare = !compareFname.equals("null"); + boolean isAccum = MRJobConfiguration.getResultMergeInfoAccumulator(job); if( ii == InputInfo.TextCellInputInfo ) _reducer = new ResultMergeReducerTextCell(requiresCompare); else if( ii == InputInfo.BinaryCellInputInfo ) _reducer = new ResultMergeReducerBinaryCell(requiresCompare); else if( ii == InputInfo.BinaryBlockInputInfo ) - _reducer = new ResultMergeReducerBinaryBlock(requiresCompare, job); + _reducer = new ResultMergeReducerBinaryBlock(requiresCompare, isAccum, job); else - throw new RuntimeException("Unable to configure mapper with unknown input info: "+ii.toString()); + throw new RuntimeException("Unable to configure mapper with unknown input info: "+ii.toString()+" "+isAccum); } @Override @@ -266,12 +265,15 @@ public class ResultMergeRemoteReducer private static class ResultMergeReducerBinaryBlock extends ResultMerge implements ResultMergeReducer { + private static final long serialVersionUID = 84399890805869855L; + private boolean _requiresCompare; private JobConf _job = null; - public ResultMergeReducerBinaryBlock(boolean requiresCompare, JobConf job) { + public ResultMergeReducerBinaryBlock(boolean requiresCompare, boolean isAccum, JobConf job) { _requiresCompare = requiresCompare; _job = job; + _isAccum = isAccum; } @Override @@ -323,7 +325,7 @@ public class ResultMergeRemoteReducer } //sort sparse due to append-only - if( appendOnly ) + if( appendOnly && !_isAccum ) mbOut.sortSparseRows(); //change sparsity if required after http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java index 804e490..a68c23e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java @@ -43,12 +43,9 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.utils.Statistics; -/** - * MR job class for submitting parfor result merge MR jobs. - * - */ public class ResultMergeRemoteSpark extends ResultMerge -{ +{ + private static final long serialVersionUID = -6924566953903424820L; private ExecutionContext _ec = null; private int _numMappers = -1; @@ -162,23 +159,24 @@ public class ResultMergeRemoteSpark extends ResultMerge //Step 2a: merge with compare JavaPairRDD<MatrixIndexes, MatrixBlock> out = null; - if( withCompare ) - { + if( withCompare ) { JavaPairRDD<MatrixIndexes, MatrixBlock> compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo); - + //merge values which differ from compare values - ResultMergeRemoteSparkWCompare cfun = new ResultMergeRemoteSparkWCompare(); + ResultMergeRemoteSparkWCompare cfun = new ResultMergeRemoteSparkWCompare(_isAccum); out = rdd.groupByKey(numRed) //group all result blocks per key - .join(compareRdd) //join compare block and result blocks - .mapToPair(cfun); //merge result blocks w/ compare + .join(compareRdd) //join compare block and result blocks + .mapToPair(cfun); //merge result blocks w/ compare } //Step 2b: merge without compare else { //direct merge in any order (disjointness guaranteed) - out = RDDAggregateUtils.mergeByKey(rdd, false); + out = _isAccum ? + RDDAggregateUtils.sumByKeyStable(rdd, false) : + RDDAggregateUtils.mergeByKey(rdd, false); } - + //Step 3: create output rdd handle w/ lineage ret = new RDDObject(out); for(int i=0; i<paths.length; i++) @@ -188,7 +186,7 @@ public class ResultMergeRemoteSpark extends ResultMerge } catch( Exception ex ) { throw new DMLRuntimeException(ex); - } + } //maintain statistics Statistics.incrementNoOfCompiledSPInst(); @@ -203,9 +201,7 @@ public class ResultMergeRemoteSpark extends ResultMerge private static int determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed) { //set the number of mappers and reducers long reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1); - int ret = (int)Math.min( numRed, reducerGroups ); - - return ret; + return (int)Math.min( numRed, reducerGroups ); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java index 27cc894..9a755b2 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java @@ -36,6 +36,10 @@ public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairF { private static final long serialVersionUID = -5970805069405942836L; + public ResultMergeRemoteSparkWCompare(boolean accum) { + _isAccum = accum; + } + @Override public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, Tuple2<Iterable<MatrixBlock>, MatrixBlock>> arg) throws Exception http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java index 378d1ef..1c233fe 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java @@ -160,6 +160,7 @@ public class MRJobConfiguration //result merge info private static final String RESULTMERGE_INPUT_INFO_CONFIG="resultmerge.input.inputinfo"; private static final String RESULTMERGE_COMPARE_FILENAME_CONFIG="resultmerge.compare.filename"; + private static final String RESULTMERGE_ACCUMULATOR_CONFIG="resultmerge.accumulator"; private static final String RESULTMERGE_STAGING_DIR_CONFIG="resultmerge.staging.dir"; private static final String RESULTMERGE_MATRIX_NUM_ROW_CONFIG="resultmerge.matrix.num.row"; private static final String RESULTMERGE_MATRIX_NUM_COLUMN_CONFIG="resultmerge.matrix.num.column"; @@ -632,10 +633,11 @@ public class MRJobConfiguration return job.getBoolean(PARTITIONING_TRANSPOSE_COL_CONFIG, false); } - public static void setResultMergeInfo( JobConf job, String fnameNew, InputInfo ii, String stagingDir, long rlen, long clen, int brlen, int bclen ) + public static void setResultMergeInfo( JobConf job, String fnameNew, boolean accum, InputInfo ii, String stagingDir, long rlen, long clen, int brlen, int bclen ) throws DMLRuntimeException { job.set(RESULTMERGE_COMPARE_FILENAME_CONFIG, fnameNew); + job.set(RESULTMERGE_ACCUMULATOR_CONFIG, String.valueOf(accum)); job.set(RESULTMERGE_INPUT_INFO_CONFIG, InputInfo.inputInfoToString(ii)); job.set(RESULTMERGE_STAGING_DIR_CONFIG, stagingDir); job.set(RESULTMERGE_MATRIX_NUM_ROW_CONFIG, String.valueOf(rlen)); @@ -644,11 +646,14 @@ public class MRJobConfiguration job.set(RESULTMERGE_BLOCK_NUM_COLUMN_CONFIG, String.valueOf(bclen)); } - public static String getResultMergeInfoCompareFilename( JobConf job ) - { + public static String getResultMergeInfoCompareFilename( JobConf job ) { return job.get(RESULTMERGE_COMPARE_FILENAME_CONFIG); } + public static boolean getResultMergeInfoAccumulator( JobConf job ) { + return Boolean.parseBoolean(job.get(RESULTMERGE_ACCUMULATOR_CONFIG)); + } + public static InputInfo getResultMergeInputInfo( JobConf job ) { return InputInfo.stringToInputInfo( job.get(RESULTMERGE_INPUT_INFO_CONFIG) ); http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java index 7936c26..3ce5f73 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForAccumulatorResultMergeTest.java @@ -70,6 +70,46 @@ public class ParForAccumulatorResultMergeTest extends AutomatedTestBase public void testParForAccumulatorLocalInitSparse() { runParForAccumulatorResultMergeTest(TEST_NAME1, true, true, ExecType.CP); } + + @Test + public void testParForAccumulatorRemoteEmptyDenseMR() { + runParForAccumulatorResultMergeTest(TEST_NAME2, false, false, ExecType.MR); + } + + @Test + public void testParForAccumulatorRemoteEmptySparseMR() { + runParForAccumulatorResultMergeTest(TEST_NAME2, false, true, ExecType.MR); + } + + @Test + public void testParForAccumulatorRemoteInitDenseMR() { + runParForAccumulatorResultMergeTest(TEST_NAME2, true, false, ExecType.MR); + } + + @Test + public void testParForAccumulatorRemoteInitSparseMR() { + runParForAccumulatorResultMergeTest(TEST_NAME2, true, true, ExecType.MR); + } + + @Test + public void testParForAccumulatorRemoteEmptyDenseSP() { + runParForAccumulatorResultMergeTest(TEST_NAME3, false, false, ExecType.SPARK); + } + + @Test + public void testParForAccumulatorRemoteEmptySparseSP() { + runParForAccumulatorResultMergeTest(TEST_NAME3, false, true, ExecType.SPARK); + } + + @Test + public void testParForAccumulatorRemoteInitDenseSP() { + runParForAccumulatorResultMergeTest(TEST_NAME3, true, false, ExecType.SPARK); + } + + @Test + public void testParForAccumulatorRemoteInitSparseSP() { + runParForAccumulatorResultMergeTest(TEST_NAME3, true, true, ExecType.SPARK); + } private void runParForAccumulatorResultMergeTest( String test, boolean init, boolean sparse, ExecType et ) { @@ -96,16 +136,15 @@ public class ParForAccumulatorResultMergeTest extends AutomatedTestBase programArgs = new String[]{"-args", String.valueOf(rows), String.valueOf(cols), String.valueOf(init).toUpperCase(), String.valueOf(sparse).toUpperCase(), output("R") }; - - fullRScriptName = HOME + TEST_NAME + ".R"; + fullRScriptName = HOME + TEST_NAME.substring(0, TEST_NAME.length()-1) + ".R"; rCmd = "Rscript" + " " + fullRScriptName + " " + String.valueOf(rows) + " " + String.valueOf(cols) + " " + String.valueOf(init).toUpperCase() + " " + String.valueOf(sparse).toUpperCase() + " " + expectedDir(); - + //run tests runTest(true, false, null, -1); runRScript(true); - + //compare matrices HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); HashMap<CellIndex, Double> rfile = readRMatrixFromFS("R"); http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/scripts/functions/parfor/parfor_accumulator.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_accumulator.R b/src/test/scripts/functions/parfor/parfor_accumulator.R new file mode 100644 index 0000000..a1c587c --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_accumulator.R @@ -0,0 +1,39 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +rlen = as.integer(args[1]); +clen = as.integer(args[2]); + +R = matrix(ifelse(as.logical(args[3]), 7, 0), rlen, clen); +if( as.logical(args[4]) ) { + R[,50:300] = matrix(0, rlen, 251); +} +for(i in 1:10) { + R = R + matrix(i, rlen, clen); +} + +writeMM(as(R, "CsparseMatrix"), paste(args[5], "R", sep="")); http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/scripts/functions/parfor/parfor_accumulator1.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_accumulator1.R b/src/test/scripts/functions/parfor/parfor_accumulator1.R deleted file mode 100644 index a1c587c..0000000 --- a/src/test/scripts/functions/parfor/parfor_accumulator1.R +++ /dev/null @@ -1,39 +0,0 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - - -args <- commandArgs(TRUE) -options(digits=22) - -library("Matrix") - -rlen = as.integer(args[1]); -clen = as.integer(args[2]); - -R = matrix(ifelse(as.logical(args[3]), 7, 0), rlen, clen); -if( as.logical(args[4]) ) { - R[,50:300] = matrix(0, rlen, 251); -} -for(i in 1:10) { - R = R + matrix(i, rlen, clen); -} - -writeMM(as(R, "CsparseMatrix"), paste(args[5], "R", sep="")); http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/scripts/functions/parfor/parfor_accumulator2.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_accumulator2.dml b/src/test/scripts/functions/parfor/parfor_accumulator2.dml new file mode 100644 index 0000000..e81a4cc --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_accumulator2.dml @@ -0,0 +1,34 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +rlen = $1; +clen = $2; +init = $3 +sparse = $4; + +R = matrix(ifelse(init, 7, 0), rlen, clen); +if( sparse ) + R[,50:300] = matrix(0, rlen, 251); + +parfor(i in 1:10, opt=CONSTRAINED, resultmerge=REMOTE_MR) + R += matrix(i, rlen, clen); + +write(R, $5); http://git-wip-us.apache.org/repos/asf/systemml/blob/2ef6342f/src/test/scripts/functions/parfor/parfor_accumulator3.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/parfor/parfor_accumulator3.dml b/src/test/scripts/functions/parfor/parfor_accumulator3.dml new file mode 100644 index 0000000..7418917 --- /dev/null +++ b/src/test/scripts/functions/parfor/parfor_accumulator3.dml @@ -0,0 +1,34 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +rlen = $1; +clen = $2; +init = $3 +sparse = $4; + +R = matrix(ifelse(init, 7, 0), rlen, clen); +if( sparse ) + R[,50:300] = matrix(0, rlen, 251); + +parfor(i in 1:10, opt=CONSTRAINED, resultmerge=REMOTE_SPARK) + R += matrix(i, rlen, clen); + +write(R, $5);
