This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 78b340fe9e [MINOR] Additional parfor tests, and minor cleanups of
unused code
78b340fe9e is described below
commit 78b340fe9eb84adbc1e236f543366b5d53653faf
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Aug 17 13:42:11 2024 +0200
[MINOR] Additional parfor tests, and minor cleanups of unused code
---
src/main/java/org/apache/sysds/hops/LiteralOp.java | 2 +-
.../sysds/parser/BuiltinFunctionExpression.java | 1 +
.../runtime/controlprogram/ParForProgramBlock.java | 47 +-----
.../parfor/ResultMergeRemoteGrouping.java | 42 -----
.../parfor/ResultMergeRemoteSorting.java | 46 -----
.../parfor/ResultMergeTaggedMatrixIndexes.java | 94 -----------
.../controlprogram/parfor/TaskPartitioner.java | 4 +-
.../parfor/TaskPartitionerFactory.java | 50 ++++++
.../parfor/TaskPartitionerNaive.java | 6 +-
.../parfor/TaskPartitionerStatic.java | 6 +-
.../parfor/opt/OptTreePlanChecker.java | 185 ---------------------
.../parfor/opt/OptimizationWrapper.java | 13 --
.../sysds/runtime/util/BinaryBlockInputFormat.java | 48 ------
.../runtime/util/BinaryBlockRecordReader.java | 68 --------
.../apache/sysds/utils/SystemDSLoaderUtils.java | 41 -----
.../sysds/test/component/misc/OpTypeTest.java | 4 -
.../test/component/parfor/TaskPartitionerTest.java | 85 ++++++++++
.../functions/builtin/part2/BuiltinRaJoinTest.java | 1 -
.../parfor/misc/ParForRecursiveFunctionTest.java | 84 ++++++++++
.../scripts/functions/parfor/parfor_recursive.dml | 37 +++++
20 files changed, 270 insertions(+), 594 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/LiteralOp.java
b/src/main/java/org/apache/sysds/hops/LiteralOp.java
index 9c44ef1187..1d2911f1fa 100644
--- a/src/main/java/org/apache/sysds/hops/LiteralOp.java
+++ b/src/main/java/org/apache/sysds/hops/LiteralOp.java
@@ -254,8 +254,8 @@ public class LiteralOp extends Hop
case FP64:
return String.valueOf(value_double);
case STRING:
+ case HASH32:
case HASH64:
- return value_string;
case CHARACTER:
return value_string;
case UNKNOWN:
diff --git
a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index ec9b4a4bbd..de10e090f5 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -965,6 +965,7 @@ public class BuiltinFunctionExpression extends
DataIdentifier {
case CHARACTER:
case FP64:
case FP32:
+ case HASH32:
case HASH64: //default
output.setValueType(ValueType.FP64);
break;
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index c4c75c35e7..01896cb73e 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -75,12 +75,7 @@ import
org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalMemory;
import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeRemoteSpark;
import org.apache.sysds.runtime.controlprogram.parfor.Task;
import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitioner;
-import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoring;
-import
org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoringCmax;
-import
org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactoringCmin;
-import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFixedsize;
-import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerNaive;
-import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerStatic;
+import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory;
import org.apache.sysds.runtime.controlprogram.parfor.opt.OptTreeConverter;
import org.apache.sysds.runtime.controlprogram.parfor.opt.OptimizationWrapper;
import org.apache.sysds.runtime.controlprogram.parfor.opt.OptimizerRuleBased;
@@ -1279,43 +1274,9 @@ public class ParForProgramBlock extends ForProgramBlock {
* @param incr ?
* @return task partitioner
*/
- private TaskPartitioner createTaskPartitioner( IntObject from,
IntObject to, IntObject incr )
- {
- TaskPartitioner tp;
-
- switch( _taskPartitioner ) {
- case FIXED:
- tp = new TaskPartitionerFixedsize(
- _taskSize, _iterPredVar, from, to,
incr);
- break;
- case NAIVE:
- tp = new TaskPartitionerNaive(
- _taskSize, _iterPredVar, from, to,
incr);
- break;
- case STATIC:
- tp = new TaskPartitionerStatic(
- _taskSize, _numThreads, _iterPredVar,
from, to, incr);
- break;
- case FACTORING:
- tp = new TaskPartitionerFactoring(
- _taskSize,_numThreads, _iterPredVar,
from, to, incr);
- break;
- case FACTORING_CMIN:
- //for constrained factoring the tasksize is
used as the minimum constraint
- tp = new
TaskPartitionerFactoringCmin(_taskSize,_numThreads,
- _taskSize, _iterPredVar, from, to,
incr);
- break;
-
- case FACTORING_CMAX:
- //for constrained factoring the tasksize is
used as the minimum constraint
- tp = new
TaskPartitionerFactoringCmax(_taskSize,_numThreads,
- _taskSize, _iterPredVar, from, to,
incr);
- break;
- default:
- throw new DMLRuntimeException("Undefined task
partitioner: '"+_taskPartitioner+"'.");
- }
-
- return tp;
+ private TaskPartitioner createTaskPartitioner( IntObject from,
IntObject to, IntObject incr ) {
+ return TaskPartitionerFactory.createTaskPartitioner(
+ _taskPartitioner, from, to, incr, _taskSize,
_numThreads, _iterPredVar);
}
/**
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
deleted file mode 100644
index ccf27c9a8f..0000000000
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor;
-
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-public class ResultMergeRemoteGrouping extends WritableComparator
-{
- protected ResultMergeRemoteGrouping() {
- super(ResultMergeTaggedMatrixIndexes.class,true);
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public int compare(WritableComparable k1, WritableComparable k2)
- {
- ResultMergeTaggedMatrixIndexes key1 =
(ResultMergeTaggedMatrixIndexes)k1;
- ResultMergeTaggedMatrixIndexes key2 =
(ResultMergeTaggedMatrixIndexes)k2;
-
- //group by matrix indexes only (including all tags)
- return key1.getIndexes().compareTo(key2.getIndexes());
- }
-}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
deleted file mode 100644
index 5d44903958..0000000000
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-public class ResultMergeRemoteSorting extends WritableComparator
-{
- protected ResultMergeRemoteSorting() {
- super(ResultMergeTaggedMatrixIndexes.class, true);
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public int compare(WritableComparable k1, WritableComparable k2)
- {
- ResultMergeTaggedMatrixIndexes key1 =
(ResultMergeTaggedMatrixIndexes)k1;
- ResultMergeTaggedMatrixIndexes key2 =
(ResultMergeTaggedMatrixIndexes)k2;
-
- int ret = key1.getIndexes().compareTo(key2.getIndexes());
- if( ret == 0 ) //same indexes, secondary sort
- {
- ret = ((key1.getTag() == key2.getTag()) ? 0 :
- (key1.getTag() < key2.getTag())? -1 : 1);
- }
- return ret;
- }
-}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
deleted file mode 100644
index c0437dce35..0000000000
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-
-/**
- * This class serves as composite key for the remote result merge job
- * (for any data format) in order to sort on both matrix indexes and tag
- * but group all blocks according to matrix indexes only. This prevents
- * us from doing an 2pass out-of-core algorithm at the reducer since we
- * can guarantee that the compare block (tag 0) will be the first element
- * in the iterator.
- *
- */
-public class ResultMergeTaggedMatrixIndexes implements
WritableComparable<ResultMergeTaggedMatrixIndexes>
-{
- private MatrixIndexes _ix;
- private byte _tag = -1;
-
- public ResultMergeTaggedMatrixIndexes() {
- _ix = new MatrixIndexes();
- }
-
- public MatrixIndexes getIndexes() {
- return _ix;
- }
-
- public byte getTag() {
- return _tag;
- }
-
- public void setTag(byte tag) {
- _tag = tag;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- if( _ix == null )
- _ix = new MatrixIndexes();
- _ix.readFields(in);
- _tag = in.readByte();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- _ix.write(out);
- out.writeByte(_tag);
- }
-
- @Override
- public int compareTo(ResultMergeTaggedMatrixIndexes that) {
- int ret = _ix.compareTo(that._ix);
- if( ret == 0 )
- ret = ((_tag == that._tag) ? 0 :
- (_tag < that._tag)? -1 : 1);
- return ret;
- }
-
- @Override
- public boolean equals(Object other) {
- if( !(other instanceof ResultMergeTaggedMatrixIndexes) )
- return false;
- ResultMergeTaggedMatrixIndexes that =
(ResultMergeTaggedMatrixIndexes)other;
- return (_ix.equals(that._ix) && _tag == that._tag);
- }
-
- @Override
- public int hashCode() {
- throw new RuntimeException("hashCode() should never be called
on instances of this class.");
- }
-}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java
index abd0279077..9d46947ba6 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitioner.java
@@ -33,8 +33,8 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
*/
public abstract class TaskPartitioner
{
- protected long _taskSize = -1;
- protected String _iterVarName = null;
+ protected long _taskSize = -1;
+ protected String _iterVarName = null;
protected IntObject _fromVal = null;
protected IntObject _toVal = null;
protected IntObject _incrVal = null;
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
new file mode 100644
index 0000000000..66d4961104
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.parfor;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
+import org.apache.sysds.runtime.instructions.cp.IntObject;
+
+public abstract class TaskPartitionerFactory
+{
+ public static TaskPartitioner createTaskPartitioner(PTaskPartitioner
type,
+ IntObject from, IntObject to, IntObject incr, long taskSize,
int numThreads, String iterPredVar)
+ {
+ switch( type ) {
+ case FIXED:
+ return new TaskPartitionerFixedsize(taskSize,
iterPredVar, from, to, incr);
+ case NAIVE:
+ return new TaskPartitionerNaive(taskSize,
iterPredVar, from, to, incr);
+ case STATIC:
+ return new TaskPartitionerStatic(taskSize,
numThreads, iterPredVar, from, to, incr);
+ case FACTORING:
+ return new TaskPartitionerFactoring(taskSize,
numThreads, iterPredVar, from, to, incr);
+ case FACTORING_CMIN:
+ return new
TaskPartitionerFactoringCmin(taskSize,
+ numThreads, taskSize, iterPredVar,
from, to, incr);
+ case FACTORING_CMAX:
+ return new
TaskPartitionerFactoringCmax(taskSize,
+ numThreads, taskSize, iterPredVar,
from, to, incr);
+ default:
+ throw new DMLRuntimeException("Undefined task
partitioner: '"+type+"'.");
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java
index 7704974548..3894bcff92 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerNaive.java
@@ -29,12 +29,12 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
*/
public class TaskPartitionerNaive extends TaskPartitionerFixedsize
{
-
- public TaskPartitionerNaive( long taskSize, String iterVarName,
IntObject fromVal, IntObject toVal, IntObject incrVal )
+ public TaskPartitionerNaive( long taskSize, String iterVarName,
+ IntObject fromVal, IntObject toVal, IntObject incrVal )
{
super(taskSize, iterVarName, fromVal, toVal, incrVal);
//compute the new task size
_taskSize = 1;
- }
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java
index 54ea8cbdc1..244cca320d 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/TaskPartitionerStatic.java
@@ -29,12 +29,12 @@ import org.apache.sysds.runtime.instructions.cp.IntObject;
*/
public class TaskPartitionerStatic extends TaskPartitionerFixedsize
{
-
- public TaskPartitionerStatic( long taskSize, int numThreads, String
iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
+ public TaskPartitionerStatic( long taskSize, int numThreads,
+ String iterVarName, IntObject fromVal, IntObject toVal,
IntObject incrVal )
{
super(taskSize, iterVarName, fromVal, toVal, incrVal);
_taskSize = _numIter / numThreads;
_firstnPlus1 = (int)_numIter % numThreads;
- }
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java
deleted file mode 100644
index 2cbf1228a9..0000000000
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanChecker.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.controlprogram.parfor.opt;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-
-import org.apache.sysds.hops.FunctionOp;
-import org.apache.sysds.hops.Hop;
-import org.apache.sysds.parser.DMLProgram;
-import org.apache.sysds.parser.ForStatement;
-import org.apache.sysds.parser.ForStatementBlock;
-import org.apache.sysds.parser.FunctionStatement;
-import org.apache.sysds.parser.FunctionStatementBlock;
-import org.apache.sysds.parser.IfStatement;
-import org.apache.sysds.parser.IfStatementBlock;
-import org.apache.sysds.parser.StatementBlock;
-import org.apache.sysds.parser.WhileStatement;
-import org.apache.sysds.parser.WhileStatementBlock;
-import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
-import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
-import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
-import org.apache.sysds.runtime.controlprogram.IfProgramBlock;
-import org.apache.sysds.runtime.controlprogram.Program;
-import org.apache.sysds.runtime.controlprogram.ProgramBlock;
-import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
-import org.apache.sysds.runtime.instructions.Instruction;
-import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
-
-public class OptTreePlanChecker
-{
-
- public static void checkProgramCorrectness( ProgramBlock pb,
StatementBlock sb, Set<String> fnStack )
- {
- Program prog = pb.getProgram();
- DMLProgram dprog = sb.getDMLProg();
-
- if (pb instanceof FunctionProgramBlock && sb instanceof
FunctionStatementBlock ) {
- FunctionProgramBlock fpb = (FunctionProgramBlock)pb;
- FunctionStatementBlock fsb = (FunctionStatementBlock)sb;
- FunctionStatement fstmt =
(FunctionStatement)fsb.getStatement(0);
- for( int i=0; i<fpb.getChildBlocks().size(); i++ ) {
- ProgramBlock pbc = fpb.getChildBlocks().get(i);
- StatementBlock sbc = fstmt.getBody().get(i);
- checkProgramCorrectness(pbc, sbc, fnStack);
- }
- }
- else if (pb instanceof WhileProgramBlock && sb instanceof
WhileStatementBlock) {
- WhileProgramBlock wpb = (WhileProgramBlock) pb;
- WhileStatementBlock wsb = (WhileStatementBlock) sb;
- WhileStatement wstmt = (WhileStatement)
wsb.getStatement(0);
- checkHopDagCorrectness(prog, dprog,
wsb.getPredicateHops(), wpb.getPredicate(), fnStack);
- for( int i=0; i<wpb.getChildBlocks().size(); i++ ) {
- ProgramBlock pbc = wpb.getChildBlocks().get(i);
- StatementBlock sbc = wstmt.getBody().get(i);
- checkProgramCorrectness(pbc, sbc, fnStack);
- }
- checkLinksProgramStatementBlock(wpb, wsb);
- }
- else if (pb instanceof IfProgramBlock && sb instanceof
IfStatementBlock) {
- IfProgramBlock ipb = (IfProgramBlock) pb;
- IfStatementBlock isb = (IfStatementBlock) sb;
- IfStatement istmt = (IfStatement) isb.getStatement(0);
- checkHopDagCorrectness(prog, dprog,
isb.getPredicateHops(), ipb.getPredicate(), fnStack);
- for( int i=0; i<ipb.getChildBlocksIfBody().size(); i++
) {
- ProgramBlock pbc =
ipb.getChildBlocksIfBody().get(i);
- StatementBlock sbc = istmt.getIfBody().get(i);
- checkProgramCorrectness(pbc, sbc, fnStack);
- }
- for( int i=0; i<ipb.getChildBlocksElseBody().size();
i++ ) {
- ProgramBlock pbc =
ipb.getChildBlocksElseBody().get(i);
- StatementBlock sbc = istmt.getElseBody().get(i);
- checkProgramCorrectness(pbc, sbc, fnStack);
- }
- checkLinksProgramStatementBlock(ipb, isb);
- }
- else if (pb instanceof ForProgramBlock && sb instanceof
ForStatementBlock) { //incl parfor
- ForProgramBlock fpb = (ForProgramBlock) pb;
- ForStatementBlock fsb = (ForStatementBlock) sb;
- ForStatement fstmt = (ForStatement) sb.getStatement(0);
- checkHopDagCorrectness(prog, dprog, fsb.getFromHops(),
fpb.getFromInstructions(), fnStack);
- checkHopDagCorrectness(prog, dprog, fsb.getToHops(),
fpb.getToInstructions(), fnStack);
- checkHopDagCorrectness(prog, dprog,
fsb.getIncrementHops(), fpb.getIncrementInstructions(), fnStack);
- for( int i=0; i<fpb.getChildBlocks().size(); i++ ) {
- ProgramBlock pbc = fpb.getChildBlocks().get(i);
- StatementBlock sbc = fstmt.getBody().get(i);
- checkProgramCorrectness(pbc, sbc, fnStack);
- }
- checkLinksProgramStatementBlock(fpb, fsb);
- }
- else if( pb instanceof BasicProgramBlock ) {
- BasicProgramBlock bpb = (BasicProgramBlock) pb;
- checkHopDagCorrectness(prog, dprog, sb.getHops(),
bpb.getInstructions(), fnStack);
- }
- }
-
- private static void checkHopDagCorrectness( Program prog, DMLProgram
dprog, ArrayList<Hop> roots, ArrayList<Instruction> inst, Set<String> fnStack )
{
- if( roots != null )
- for( Hop hop : roots )
- checkHopDagCorrectness(prog, dprog, hop, inst,
fnStack);
- }
-
- private static void checkHopDagCorrectness( Program prog, DMLProgram
dprog, Hop root, ArrayList<Instruction> inst, Set<String> fnStack ) {
- //set of checks to perform
- checkFunctionNames(prog, dprog, root, inst, fnStack);
- }
-
- private static void checkLinksProgramStatementBlock( ProgramBlock pb,
StatementBlock sb ) {
- if( pb.getStatementBlock() != sb )
- throw new DMLRuntimeException("Links between
programblocks and statementblocks are incorrect ("+pb+").");
- }
-
- private static void checkFunctionNames( Program prog, DMLProgram dprog,
Hop root, ArrayList<Instruction> inst, Set<String> fnStack ) {
- //reset visit status of dag
- root.resetVisitStatus();
-
- //get all function op in this dag
- HashMap<String, FunctionOp> fops = new HashMap<>();
- getAllFunctionOps(root, fops);
-
- for( Instruction linst : inst )
- if( linst instanceof FunctionCallCPInstruction )
- {
- FunctionCallCPInstruction flinst =
(FunctionCallCPInstruction) linst;
- String fnamespace = flinst.getNamespace();
- String fname = flinst.getFunctionName();
- String key =
DMLProgram.constructFunctionKey(fnamespace, fname);
-
- //check 1: instruction name equal to hop name
- if( !fops.containsKey(key) )
- throw new DMLRuntimeException(
"Function Check: instruction and hop names differ ("+key+", "+fops.keySet()+")"
);
-
- //check 2: function exists
- if(
!prog.getFunctionProgramBlocks().containsKey(key) )
- throw new DMLRuntimeException(
"Function Check: function does not exits ("+key+")" );
-
- //check 3: recursive program check
- FunctionProgramBlock fpb =
prog.getFunctionProgramBlock(fnamespace, fname);
- FunctionStatementBlock fsb =
dprog.getFunctionStatementBlock(fnamespace, fname);
- if( !fnStack.contains(key) )
- {
- fnStack.add(key);
- checkProgramCorrectness(fpb, fsb,
fnStack);
- fnStack.remove(key);
- }
- }
- }
-
- private static void getAllFunctionOps( Hop hop, HashMap<String,
FunctionOp> memo )
- {
- if( hop.isVisited() )
- return;
-
- //process functionop
- if( hop instanceof FunctionOp ) {
- FunctionOp fop = (FunctionOp) hop;
- memo.put(fop.getFunctionKey(), fop);
- }
-
- //process children
- for( Hop in : hop.getInput() )
- getAllFunctionOps(in, memo);
-
- hop.setVisited();
- }
-}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
index 6dbc952172..4a7cec8cd2 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
@@ -20,7 +20,6 @@
package org.apache.sysds.runtime.controlprogram.parfor.opt;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -75,7 +74,6 @@ public class OptimizationWrapper
//internal parameters
public static final double PAR_FACTOR_INFRASTRUCTURE = 1.0;
- private static final boolean CHECK_PLAN_CORRECTNESS = false;
/**
@@ -227,17 +225,6 @@ public class OptimizationWrapper
//core optimize
opt.optimize(sb, pb, tree, est, numRuns, ec);
LOG.debug("ParFOR Opt: Optimized plan (after optimization): \n"
+ tree.explain(false));
-
- //assert plan correctness
- if( CHECK_PLAN_CORRECTNESS && LOG.isDebugEnabled() ) {
- try{
- OptTreePlanChecker.checkProgramCorrectness(pb,
sb, new HashSet<String>());
- LOG.debug("ParFOR Opt: Checked plan and program
correctness.");
- }
- catch(Exception ex) {
- throw new DMLRuntimeException("Failed to check
program correctness.", ex);
- }
- }
long ltime = (long) time.stop();
LOG.trace("ParFOR Opt: Optimized plan in "+ltime+"ms.");
diff --git
a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java
b/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java
deleted file mode 100644
index d9d8fc080b..0000000000
--- a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockInputFormat.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.util;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-import org.apache.hadoop.mapred.FileSplit;
-
-/**
- * Custom binary block input format to return the custom record reader.
- * <p>
- * NOTE: Not used by default.
- * <p>
- * NOTE: Used for performance debugging of binary block HDFS reads.
- */
-public class BinaryBlockInputFormat extends
SequenceFileInputFormat<MatrixIndexes,MatrixBlock>
-{
- @Override
- public RecordReader<MatrixIndexes, MatrixBlock>
getRecordReader(InputSplit split, JobConf job, Reporter reporter)
- throws IOException
- {
- return new BinaryBlockRecordReader(job, (FileSplit)split);
- }
-}
\ No newline at end of file
diff --git
a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java
b/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java
deleted file mode 100644
index ddae00d721..0000000000
--- a/src/main/java/org/apache/sysds/runtime/util/BinaryBlockRecordReader.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.util;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-
-/**
- * Custom record reader for binary block. Currently its only purpose is to
allow for
- * detailed profiling of overall read time (io, deserialize, decompress).
- *
- * NOTE: not used by default.
- */
-public class BinaryBlockRecordReader extends
SequenceFileRecordReader<MatrixIndexes,MatrixBlock>
-{
- //private long _time = 0;
-
- public BinaryBlockRecordReader(Configuration conf, FileSplit split)
- throws IOException
- {
- super(conf, split);
-
- }
-
- @Override
- public synchronized boolean next(MatrixIndexes key, MatrixBlock value)
- throws IOException
- {
- //long t0 = System.nanoTime();
- boolean ret = super.next(key, value);
- //long t1 = System.nanoTime();
-
- //_time+=(t1-t0);
-
- return ret;
- }
-
- @Override
- public synchronized void close()
- throws IOException
- {
- //in milliseconds.
- //System.out.println(_time/1000000);
- super.close();
- }
-}
diff --git a/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java
b/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java
deleted file mode 100644
index 67761b414f..0000000000
--- a/src/main/java/org/apache/sysds/utils/SystemDSLoaderUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.utils;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.io.File;
-import java.io.IOException;
-
-public class SystemDSLoaderUtils {
-
- public void loadSystemDS(String filePath)
- throws NoSuchMethodException, SecurityException,
IllegalAccessException, IllegalArgumentException, InvocationTargetException,
IOException {
- URL url = new File(filePath).toURI().toURL();
- try( URLClassLoader classLoader =
(URLClassLoader)ClassLoader.getSystemClassLoader() ) {
- Method method =
URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
- method.setAccessible(true);
- method.invoke(classLoader, url);
- }
- }
-
-}
diff --git a/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java
b/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java
index fabff5a459..581ff0d405 100644
--- a/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java
+++ b/src/test/java/org/apache/sysds/test/component/misc/OpTypeTest.java
@@ -19,10 +19,6 @@
package org.apache.sysds.test.component.misc;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Enumeration;
-
import org.apache.sysds.common.Types.OpOp1;
import org.apache.sysds.common.Types.OpOp2;
import org.apache.sysds.common.Types.OpOp3;
diff --git
a/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
new file mode 100644
index 0000000000..4e3acbe4c6
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/component/parfor/TaskPartitionerTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.parfor;
+
+import
org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PTaskPartitioner;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.controlprogram.parfor.Task;
+import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitioner;
+import org.apache.sysds.runtime.controlprogram.parfor.TaskPartitionerFactory;
+import
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysds.runtime.instructions.cp.IntObject;
+import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskPartitionerTest {
+
+ @Test
+ public void testNaive() {
+ testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE,
PTaskPartitioner.NAIVE);
+ }
+
+ @Test
+ public void testStatic() {
+ testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE,
PTaskPartitioner.STATIC);
+ }
+
+ @Test
+ public void testFixed() {
+ testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE,
PTaskPartitioner.FIXED);
+ }
+
+ @Test
+ public void testFactoring() {
+ testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE,
PTaskPartitioner.FACTORING);
+ }
+
+ @Test
+ public void testFactoring2() {
+ testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE,
PTaskPartitioner.FACTORING_CMIN);
+ }
+
+ @Test
+ public void testFactoring3() {
+ testTaskPartitioner(2*LocalTaskQueue.MAX_SIZE,
PTaskPartitioner.FACTORING_CMAX);
+ }
+
+ private void testTaskPartitioner(int numTasks, PTaskPartitioner type) {
+ LocalTaskQueue<Task> queue = new LocalTaskQueue<>();
+ TaskPartitioner partitioner =
TaskPartitionerFactory.createTaskPartitioner(
+ type, new IntObject(1), new IntObject(numTasks), new
IntObject(1),
+ numTasks, InfrastructureAnalyzer.getLocalParallelism(),
"i");
+ //asynchronous task creation
+
CommonThreadPool.get().submit(()->partitioner.createTasks(queue));
+ //consume tasks and check serialization
+ Task t = null;
+ try {
+ while((t =
queue.dequeueTask())!=LocalTaskQueue.NO_MORE_TASKS) {
+ Task ts1 =
Task.parseCompactString(t.toCompactString());
+ Task ts2 =
Task.parseCompactString(t.toCompactString(10));
+ Assert.assertEquals(t.toString(),
ts1.toString());
+ Assert.assertEquals(t.toString(),
ts2.toString());
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
index 97b820d2b1..ebaf24135c 100644
---
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinRaJoinTest.java
@@ -26,7 +26,6 @@ import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
import org.junit.Test;
-import java.util.Arrays;
import java.util.HashMap;
public class BuiltinRaJoinTest extends AutomatedTestBase
diff --git
a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java
new file mode 100644
index 0000000000..d5dffb8944
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRecursiveFunctionTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.parfor.misc;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+
+public class ParForRecursiveFunctionTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME1 = "parfor_recursive";
+ private final static String TEST_DIR = "functions/parfor/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
ParForRecursiveFunctionTest.class.getSimpleName() + "/";
+ private final static double eps = 1e-10;
+
+ private final static int rows = 20;
+ private final static int cols = 10;
+ private final static double sparsity = 1.0;
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME1, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[]{"Rout"}));
+ }
+
+ @Test
+ public void testParForCP() {
+ runParforTest(TEST_NAME1, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testParForHybrid() {
+ runParforTest(TEST_NAME1, ExecMode.HYBRID);
+ }
+
+ private void runParforTest( String TEST_NAME, ExecMode type )
+ {
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ config.addVariable("rows", rows);
+ config.addVariable("cols", cols);
+ loadTestConfiguration(config);
+ ExecMode oldExec = setExecMode(type);
+
+ try {
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[]{"-explain","-stats","-args",
input("V"), output("R") };
+
+ double[][] V = getRandomMatrix(rows, cols, 4, 4,
sparsity, 3);
+ writeInputMatrixWithMTD("V", V, true);
+
+ boolean exceptionExpected = false;
+ runTest(true, exceptionExpected, null, -1);
+
+ //compare matrices
+ HashMap<CellIndex, Double> dmlfile =
readDMLMatrixFromOutputDir("R");
+ Assert.assertEquals(5d*rows*cols, dmlfile.get(new
CellIndex(1,1)), eps);
+ }
+ finally {
+ resetExecMode(oldExec);
+ }
+ }
+}
diff --git a/src/test/scripts/functions/parfor/parfor_recursive.dml
b/src/test/scripts/functions/parfor/parfor_recursive.dml
new file mode 100644
index 0000000000..42b6c226a2
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_recursive.dml
@@ -0,0 +1,37 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+fun = function(Matrix[Double] A)
+ return(Matrix[Double] B)
+{
+ B = matrix(0, nrow(A), ncol(A))
+ parfor(i in 1:nrow(A)) {
+ if( as.scalar(A[1,1]) < 5 )
+ B[i,] = fun(A[i,]+1);
+ else
+ B[i,] = A[i, ]
+ }
+}
+
+A = read($1);
+B = as.matrix(sum(fun(A)));
+write(B, $2);
+