Repository: incubator-systemml Updated Branches: refs/heads/master 09477a7b0 -> c22f239e3
[SYSTEMML-562] Frame Append operation Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c22f239e Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c22f239e Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c22f239e Branch: refs/heads/master Commit: c22f239e3fd3b526190812919960684bfcf1a715 Parents: 09477a7 Author: Arvind Surve <[email protected]> Authored: Mon Jul 18 21:10:08 2016 -0700 Committer: Arvind Surve <[email protected]> Committed: Mon Jul 18 21:10:09 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/BinaryOp.java | 7 +- .../controlprogram/caching/CacheableData.java | 39 +++ .../controlprogram/caching/MatrixObject.java | 44 --- .../instructions/SPInstructionParser.java | 18 +- .../instructions/cp/VariableCPInstruction.java | 18 +- .../spark/AppendMSPInstruction.java | 257 +---------------- .../spark/AppendRSPInstruction.java | 85 +----- .../spark/FrameAppendMSPInstruction.java | 157 ++++++++++ .../spark/FrameAppendRSPInstruction.java | 170 +++++++++++ .../spark/MatrixAppendMSPInstruction.java | 284 +++++++++++++++++++ .../spark/MatrixAppendRSPInstruction.java | 112 ++++++++ .../sysml/runtime/matrix/data/FrameBlock.java | 2 + .../sysml/runtime/util/UtilFunctions.java | 19 ++ .../functions/frame/FrameAppendDistTest.java | 226 +++++++++++++++ src/test/scripts/functions/frame/FrameAppend.R | 33 +++ .../scripts/functions/frame/FrameAppend.dml | 29 ++ 16 files changed, 1108 insertions(+), 392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/hops/BinaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java b/src/main/java/org/apache/sysml/hops/BinaryOp.java index 94de0e7..65e9232 100644 --- a/src/main/java/org/apache/sysml/hops/BinaryOp.java +++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java @@ -1125,7 +1125,7 @@ public class BinaryOp extends Hop Lop offset = createOffsetLop( left, cbind ); //offset 1st input AppendMethod am = optFindAppendSPMethod(left.getDim1(), left.getDim2(), right.getDim1(), right.getDim2(), - right.getRowsInBlock(), right.getColsInBlock(), right.getNnz(), cbind); + right.getRowsInBlock(), right.getColsInBlock(), right.getNnz(), cbind, dt); switch( am ) { @@ -1281,16 +1281,17 @@ public class BinaryOp extends Hop return AppendMethod.MR_GAPPEND; } - private static AppendMethod optFindAppendSPMethod( long m1_dim1, long m1_dim2, long m2_dim1, long m2_dim2, long m1_rpb, long m1_cpb, long m2_nnz, boolean cbind ) + private static AppendMethod optFindAppendSPMethod( long m1_dim1, long m1_dim2, long m2_dim1, long m2_dim2, long m1_rpb, long m1_cpb, long m2_nnz, boolean cbind, DataType dt ) { if(FORCED_APPEND_METHOD != null) { return FORCED_APPEND_METHOD; } //check for best case (map-only w/o shuffle) - if( m2_dim1 >= 1 && m2_dim2 >= 1 //rhs dims known + if(( m2_dim1 >= 1 && m2_dim2 >= 1 //rhs dims known && (cbind && m2_dim2 <= m1_cpb //rhs is smaller than column block || !cbind && m2_dim1 <= m1_rpb) ) //rhs is smaller than row block + && ((dt == DataType.MATRIX) || (dt == DataType.FRAME && cbind))) { if( OptimizerUtils.checkSparkBroadcastMemoryBudget(m2_dim1, m2_dim2, m1_rpb, m1_cpb, m2_nnz) ) { return AppendMethod.MR_MAPPEND; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java index d60c607..2b45ddd 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java @@ -1422,4 +1422,43 @@ public abstract class CacheableData<T extends CacheBlock> extends Data public static synchronized void enableCaching() { _activeFlag = true; } + + /** + * + * @param fName + * @param outputFormat + * @return + * @throws CacheException + */ + public synchronized boolean moveData(String fName, String outputFormat) + throws CacheException + { + boolean ret = false; + + try + { + //export or rename to target file on hdfs + if( (isDirty() || (!isEqualOutputFormat(outputFormat) && isEmpty(true))) || + (getRDDHandle() != null && !MapReduceTool.existsFileOnHDFS(_hdfsFileName))) + { + exportData(fName, outputFormat); + ret = true; + } + else if( isEqualOutputFormat(outputFormat) ) + { + MapReduceTool.deleteFileIfExistOnHDFS(fName); + MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd"); + MapReduceTool.renameFileOnHDFS( _hdfsFileName, fName ); + writeMetaData( fName, outputFormat, null ); + ret = true; + } + } + catch (Exception e) + { + throw new CacheException ("Move to " + fName + " failed.", e); + } + + return ret; + } + } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index 5d5f41f..4148545 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -282,50 +282,6 @@ public class MatrixObject extends CacheableData<MatrixBlock> return str.toString(); } - - /** - * - * @param fName - * @param outputFormat - * @return - * @throws CacheException - */ - public synchronized boolean moveData(String fName, String outputFormat) - throws CacheException - { - boolean ret = false; - - try - { - //ensure input file is persistent on hdfs (pending RDD operations), - //file might have been written during export or collect via write/read - if( getRDDHandle() != null && !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { - writeBlobFromRDDtoHDFS(getRDDHandle(), _hdfsFileName, outputFormat); - } - - //export or rename to target file on hdfs - if( isDirty() || (!isEqualOutputFormat(outputFormat) && isEmpty(true))) - { - exportData(fName, outputFormat); - ret = true; - } - else if( isEqualOutputFormat(outputFormat) ) - { - MapReduceTool.deleteFileIfExistOnHDFS(fName); - MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd"); - MapReduceTool.renameFileOnHDFS( _hdfsFileName, fName ); - writeMetaData( fName, outputFormat, null ); - ret = true; - } - } - catch (Exception e) - { - throw new CacheException ("Move to " + fName + " failed.", e); - } - - return ret; - } - // ********************************************* // *** *** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java index e8437fb..c74b44e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java @@ -34,13 +34,12 @@ import org.apache.sysml.lops.WeightedSquaredLoss; import org.apache.sysml.lops.WeightedSquaredLossR; import org.apache.sysml.lops.WeightedUnaryMM; import org.apache.sysml.lops.WeightedUnaryMMR; +import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.spark.AggregateTernarySPInstruction; import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction; import org.apache.sysml.runtime.instructions.spark.AppendGAlignedSPInstruction; import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction; -import org.apache.sysml.runtime.instructions.spark.AppendMSPInstruction; -import org.apache.sysml.runtime.instructions.spark.AppendRSPInstruction; import org.apache.sysml.runtime.instructions.spark.ArithmeticBinarySPInstruction; import org.apache.sysml.runtime.instructions.spark.BinUaggChainSPInstruction; import org.apache.sysml.runtime.instructions.spark.BuiltinBinarySPInstruction; @@ -54,9 +53,13 @@ import org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction; import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction; import org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction; import org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction; +import org.apache.sysml.runtime.instructions.spark.FrameAppendMSPInstruction; +import org.apache.sysml.runtime.instructions.spark.FrameAppendRSPInstruction; import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction; import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction; import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction; +import org.apache.sysml.runtime.instructions.spark.MatrixAppendMSPInstruction; +import org.apache.sysml.runtime.instructions.spark.MatrixAppendRSPInstruction; import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction; import org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction; import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction; @@ -78,6 +81,7 @@ import org.apache.sysml.runtime.instructions.spark.QuantileSortSPInstruction; import org.apache.sysml.runtime.instructions.spark.UaggOuterChainSPInstruction; import org.apache.sysml.runtime.instructions.spark.WriteSPInstruction; import org.apache.sysml.runtime.instructions.spark.ZipmmSPInstruction; +import org.apache.sysml.runtime.util.UtilFunctions; public class SPInstructionParser extends InstructionParser @@ -389,7 +393,10 @@ public class SPInstructionParser extends InstructionParser return MatrixReshapeSPInstruction.parseInstruction(str); case MAppend: - return AppendMSPInstruction.parseInstruction(str); + if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX) + return MatrixAppendMSPInstruction.parseInstruction(str); + else + return FrameAppendMSPInstruction.parseInstruction(str); case GAppend: return AppendGSPInstruction.parseInstruction(str); @@ -398,7 +405,10 @@ public class SPInstructionParser extends InstructionParser return AppendGAlignedSPInstruction.parseInstruction(str); case RAppend: - return AppendRSPInstruction.parseInstruction(str); + if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX) + return MatrixAppendRSPInstruction.parseInstruction(str); + else + return FrameAppendRSPInstruction.parseInstruction(str); case Rand: return RandSPInstruction.parseInstruction(str); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java index 1fae8fc..ee86b59 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java @@ -28,6 +28,7 @@ import org.apache.sysml.lops.UnaryCP; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; @@ -655,6 +656,7 @@ public class VariableCPInstruction extends CPInstruction * @param ec * @throws DMLRuntimeException */ + @SuppressWarnings("rawtypes") private void processMoveInstruction(ExecutionContext ec) throws DMLRuntimeException { if ( input3 == null ) { @@ -683,16 +685,22 @@ public class VariableCPInstruction extends CPInstruction if ( ec.getVariable(input1.getName()) == null ) throw new DMLRuntimeException("Unexpected error: could not find a data object for variable name:" + input1.getName() + ", while processing instruction " +this.toString()); - MatrixObject mo = (MatrixObject) ec.getVariable(input1.getName()); + Object object = ec.getVariable(input1.getName()); + if ( input3.getName().equalsIgnoreCase("binaryblock") ) { - boolean success = mo.moveData(input2.getName(), input3.getName()); + boolean success = false; + success = ((CacheableData)object).moveData(input2.getName(), input3.getName()); if (!success) { throw new DMLRuntimeException("Failed to move var " + input1.getName() + " to file " + input2.getName() + "."); } } - else - throw new DMLRuntimeException("Unexpected formats while copying: from blocks [" - + mo.getNumRowsPerBlock() + "," + mo.getNumColumnsPerBlock() + "] to " + input3.getName()); + else + if(object instanceof MatrixObject) + throw new DMLRuntimeException("Unexpected formats while copying: from matrix blocks [" + + ((MatrixObject)object).getNumRowsPerBlock() + "," + ((MatrixObject)object).getNumColumnsPerBlock() + "] to " + input3.getName()); + else if (object instanceof FrameObject) + throw new DMLRuntimeException("Unexpected formats while copying: from fram object [" + + ((FrameObject)object).getNumColumns() + "," + ((FrameObject)object).getNumColumns() + "] to " + input3.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java index 26be3eb..eaf23d5 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java @@ -19,35 +19,14 @@ package org.apache.sysml.runtime.instructions.spark; -import java.util.ArrayList; -import java.util.Iterator; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.PairFlatMapFunction; - -import scala.Tuple2; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; -import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; -import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; -import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; -import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; -import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; -import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.operators.Operator; -import org.apache.sysml.runtime.matrix.operators.ReorgOperator; -public class AppendMSPInstruction extends BinarySPInstruction +public abstract class AppendMSPInstruction extends BinarySPInstruction { - private CPOperand _offset = null; - private boolean _cbind = true; + protected CPOperand _offset = null; + protected boolean _cbind = true; public AppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr) { @@ -56,235 +35,5 @@ public class AppendMSPInstruction extends BinarySPInstruction _offset = offset; _cbind = cbind; } - - public static AppendMSPInstruction parseInstruction ( String str ) - throws DMLRuntimeException - { - String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields (parts, 5); - - String opcode = parts[0]; - CPOperand in1 = new CPOperand(parts[1]); - CPOperand in2 = new CPOperand(parts[2]); - CPOperand offset = new CPOperand(parts[3]); - CPOperand out = new CPOperand(parts[4]); - boolean cbind = Boolean.parseBoolean(parts[5]); - - if(!opcode.equalsIgnoreCase("mappend")) - throw new DMLRuntimeException("Unknown opcode while parsing a AppendMSPInstruction: " + str); - - return new AppendMSPInstruction( - new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), - in1, in2, offset, out, cbind, opcode, str); - } - - @Override - public void processInstruction(ExecutionContext ec) - throws DMLRuntimeException - { - // map-only append (rhs must be vector and fit in mapper mem) - SparkExecutionContext sec = (SparkExecutionContext)ec; - checkBinaryAppendInputCharacteristics(sec, _cbind, false, false); - MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName()); - MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(input2.getName()); - int brlen = mc1.getRowsPerBlock(); - int bclen = mc1.getColsPerBlock(); - - JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); - PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() ); - long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue(); - - //execute map-append operations (partitioning preserving if #in-blocks = #out-blocks) - JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; - if( preservesPartitioning(mc1, mc2, _cbind) ) { - out = in1.mapPartitionsToPair( - new MapSideAppendPartitionFunction(in2, _cbind, off, brlen, bclen), true); - } - else { - out = in1.flatMapToPair( - new MapSideAppendFunction(in2, _cbind, off, brlen, bclen)); - } - - //put output RDD handle into symbol table - updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind); - sec.setRDDHandleForVariable(output.getName(), out); - sec.addLineageRDD(output.getName(), input1.getName()); - sec.addLineageBroadcast(output.getName(), input2.getName()); - } - - /** - * - * @param mcIn1 - * @param mcIn2 - * @return - */ - private boolean preservesPartitioning( MatrixCharacteristics mcIn1, MatrixCharacteristics mcIn2, boolean cbind ) - { - long ncblksIn1 = cbind ? - (long)Math.ceil((double)mcIn1.getCols()/mcIn1.getColsPerBlock()) : - (long)Math.ceil((double)mcIn1.getRows()/mcIn1.getRowsPerBlock()); - long ncblksOut = cbind ? - (long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getColsPerBlock()) : - (long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getRowsPerBlock()); - - //mappend is partitioning-preserving if in-block append (e.g., common case of colvector append) - return (ncblksIn1 == ncblksOut); - } - - /** - * - */ - private static class MapSideAppendFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> - { - private static final long serialVersionUID = 2738541014432173450L; - - private PartitionedBroadcast<MatrixBlock> _pm = null; - private boolean _cbind = true; - private long _offset; - private int _brlen; - private int _bclen; - private long _lastBlockColIndex; - - public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen) - { - _pm = binput; - _cbind = cbind; - - _offset = offset; - _brlen = brlen; - _bclen = bclen; - - //check for boundary block - int blen = cbind ? bclen : brlen; - _lastBlockColIndex = (long)Math.ceil((double)_offset/blen); - } - - @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) - throws Exception - { - ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>(); - - IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(kv); - MatrixIndexes ix = in1.getIndexes(); - - //case 1: pass through of non-boundary blocks - if( (_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex ) - { - ret.add( kv ); - } - //case 2: pass through full input block and rhs block - else if( _cbind && in1.getValue().getNumColumns() == _bclen - || !_cbind && in1.getValue().getNumRows() == _brlen) - { - //output lhs block - ret.add( kv ); - - //output shallow copy of rhs block - if( _cbind ) { - ret.add( new Tuple2<MatrixIndexes, MatrixBlock>( - new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1), - _pm.getBlock((int)ix.getRowIndex(), 1)) ); - } - else { //rbind - ret.add( new Tuple2<MatrixIndexes, MatrixBlock>( - new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()), - _pm.getBlock(1, (int)ix.getColumnIndex())) ); - } - } - //case 3: append operation on boundary block - else - { - //allocate space for the output value - ArrayList<IndexedMatrixValue> outlist=new ArrayList<IndexedMatrixValue>(2); - IndexedMatrixValue first = new IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock()); - outlist.add(first); - - MatrixBlock value_in2 = null; - if( _cbind ) { - value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1); - if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) { - IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); - second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1); - outlist.add(second); - } - } - else { //rbind - value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex()); - if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) { - IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); - second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex()); - outlist.add(second); - } - } - - OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, _brlen, _bclen, _cbind, true, 0); - ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist)); - } - - return ret; - } - } - - /** - * - */ - private static class MapSideAppendPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, MatrixBlock> - { - private static final long serialVersionUID = 5767240739761027220L; - - private PartitionedBroadcast<MatrixBlock> _pm = null; - private boolean _cbind = true; - private long _lastBlockColIndex = -1; - - public MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen) - { - _pm = binput; - _cbind = cbind; - - //check for boundary block - int blen = cbind ? bclen : brlen; - _lastBlockColIndex = (long)Math.ceil((double)offset/blen); - } - @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0) - throws Exception - { - return new MapAppendPartitionIterator(arg0); - } - - /** - * Lazy mappend iterator to prevent materialization of entire partition output in-memory. - * The implementation via mapPartitions is required to preserve partitioning information, - * which is important for performance. - */ - private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>> - { - public MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) { - super(in); - } - - @Override - protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg) - throws Exception - { - MatrixIndexes ix = arg._1(); - MatrixBlock in1 = arg._2(); - - //case 1: pass through of non-boundary blocks - if( (_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) { - return arg; - } - //case 3: append operation on boundary block - else { - int rowix = _cbind ? (int)ix.getRowIndex() : 1; - int colix = _cbind ? 1 : (int)ix.getColumnIndex(); - MatrixBlock in2 = _pm.getBlock(rowix, colix); - MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind); - return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java index 93fc7aa..6d3cf5e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java @@ -19,25 +19,13 @@ package org.apache.sysml.runtime.instructions.spark; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.Function; - -import scala.Tuple2; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; -import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.Operator; -import org.apache.sysml.runtime.matrix.operators.ReorgOperator; -public class AppendRSPInstruction extends BinarySPInstruction + +public abstract class AppendRSPInstruction extends BinarySPInstruction { - private boolean _cbind = true; + protected boolean _cbind = true; public AppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr) { @@ -45,72 +33,5 @@ public class AppendRSPInstruction extends BinarySPInstruction _sptype = SPINSTRUCTION_TYPE.RAppend; _cbind = cbind; } - - public static AppendRSPInstruction parseInstruction ( String str ) - throws DMLRuntimeException - { - String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields (parts, 4); - - String opcode = parts[0]; - CPOperand in1 = new CPOperand(parts[1]); - CPOperand in2 = new CPOperand(parts[2]); - CPOperand out = new CPOperand(parts[3]); - boolean cbind = Boolean.parseBoolean(parts[4]); - - if(!opcode.equalsIgnoreCase("rappend")) - throw new DMLRuntimeException("Unknown opcode while parsing a AppendRSPInstruction: " + str); - - return new AppendRSPInstruction( - new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), - in1, in2, out, cbind, opcode, str); - } - - @Override - public void processInstruction(ExecutionContext ec) - throws DMLRuntimeException - { - // reduce-only append (output must have at most one column block) - SparkExecutionContext sec = (SparkExecutionContext)ec; - checkBinaryAppendInputCharacteristics(sec, _cbind, true, false); - - JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); - JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() ); - - //execute reduce-append operations (partitioning preserving) - JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1 - .join(in2) - .mapValues(new ReduceSideAppendFunction(_cbind)); - - //put output RDD handle into symbol table - updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind); - sec.setRDDHandleForVariable(output.getName(), out); - sec.addLineageRDD(output.getName(), input1.getName()); - sec.addLineageRDD(output.getName(), input2.getName()); - } - - /** - * - */ - private static class ReduceSideAppendFunction implements Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> - { - private static final long serialVersionUID = -6763904972560309095L; - - private boolean _cbind = true; - - public ReduceSideAppendFunction(boolean cbind) { - _cbind = cbind; - } - - @Override - public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0) - throws Exception - { - MatrixBlock left = arg0._1(); - MatrixBlock right = arg0._2(); - - return left.appendOperations(right, new MatrixBlock(), _cbind); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java new file mode 100644 index 0000000..b67f364 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark; + +import java.util.Iterator; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.PairFlatMapFunction; + +import scala.Tuple2; + +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.matrix.operators.ReorgOperator; + +public class FrameAppendMSPInstruction extends AppendMSPInstruction +{ + public FrameAppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr) + { + super(op, in1, in2, offset, out, cbind, opcode, istr); + } + + public static FrameAppendMSPInstruction parseInstruction ( String str ) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + InstructionUtils.checkNumFields (parts, 5); + + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand offset = new CPOperand(parts[3]); + CPOperand out = new CPOperand(parts[4]); + boolean cbind = Boolean.parseBoolean(parts[5]); + + if(!opcode.equalsIgnoreCase("mappend")) + throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendMSPInstruction: " + str); + + return new FrameAppendMSPInstruction( + new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), + in1, in2, offset, out, cbind, opcode, str); + } + + @Override + public void processInstruction(ExecutionContext ec) + throws DMLRuntimeException + { + // map-only append (rhs must be vector and fit in mapper mem) + SparkExecutionContext sec = (SparkExecutionContext)ec; + checkBinaryAppendInputCharacteristics(sec, _cbind, false, false); + + JavaPairRDD<Long,FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() ); + PartitionedBroadcast<FrameBlock> in2 = sec.getBroadcastForFrameVariable( input2.getName() ); + + //execute map-append operations (partitioning preserving if keys for blocks not changing) + JavaPairRDD<Long,FrameBlock> out = null; + if( preservesPartitioning(_cbind) ) { + out = in1.mapPartitionsToPair( + new MapSideAppendPartitionFunction(in2), true); + } + else + throw new DMLRuntimeException("Append type rbind not supported for frame mappend, instead use rappend"); + + //put output RDD handle into symbol table + updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind); + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.addLineageBroadcast(output.getName(), input2.getName()); + } + + /** + * + * @param cbind + * @return + */ + private boolean preservesPartitioning( boolean cbind ) + { + //Partitions for input1 will be preserved in case of cbind, + // where as in case of rbind partitions will not be preserved. + return cbind; + } + + /** + * + */ + private static class MapSideAppendPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock> + { + private static final long serialVersionUID = -3997051891171313830L; + + private PartitionedBroadcast<FrameBlock> _pm = null; + + public MapSideAppendPartitionFunction(PartitionedBroadcast<FrameBlock> binput) + { + _pm = binput; + } + + @Override + public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Long, FrameBlock>> arg0) + throws Exception + { + return new MapAppendPartitionIterator(arg0); + } + + /** + * Lazy mappend iterator to prevent materialization of entire partition output in-memory. + * The implementation via mapPartitions is required to preserve partitioning information, + * which is important for performance. + */ + private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<Long, FrameBlock>> + { + public MapAppendPartitionIterator(Iterator<Tuple2<Long, FrameBlock>> in) { + super(in); + } + + @Override + protected Tuple2<Long, FrameBlock> computeNext(Tuple2<Long, FrameBlock> arg) + throws Exception + { + Long ix = arg._1(); + FrameBlock in1 = arg._2(); + + int rowix = (ix.intValue()-1)/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1; + int colix = 1; + + FrameBlock in2 = _pm.getBlock(rowix, colix); + FrameBlock out = in1.appendOperations(in2, new FrameBlock(), true); //cbind + return new Tuple2<Long,FrameBlock>(ix, out); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java new file mode 100644 index 0000000..c627e40 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; + +import scala.Tuple2; + +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.matrix.operators.ReorgOperator; + +public class FrameAppendRSPInstruction extends AppendRSPInstruction +{ + public FrameAppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr) + { + super(op, in1, in2, out, cbind, opcode, istr); + } + + public static FrameAppendRSPInstruction parseInstruction ( String str ) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + InstructionUtils.checkNumFields (parts, 4); + + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand out = new CPOperand(parts[3]); + boolean cbind = Boolean.parseBoolean(parts[4]); + + if(!opcode.equalsIgnoreCase("rappend")) + throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendRSPInstruction: " + str); + + return new FrameAppendRSPInstruction( + new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), + in1, in2, out, cbind, opcode, str); + } + + @Override + public void processInstruction(ExecutionContext ec) + throws DMLRuntimeException + { + SparkExecutionContext sec = (SparkExecutionContext)ec; + JavaPairRDD<Long,FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() ); + JavaPairRDD<Long,FrameBlock> in2 = sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() ); + JavaPairRDD<Long,FrameBlock> out = null; + long leftRows = sec.getMatrixCharacteristics(input1.getName()).getRows(); + + if(_cbind) { + JavaPairRDD<Long,FrameBlock> in1Aligned = in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows)); + in1Aligned = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in1Aligned); + JavaPairRDD<Long,FrameBlock> in2Aligned = in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows)); + in2Aligned = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in2Aligned); + + out = in1Aligned.join(in2Aligned).mapValues(new ReduceSideColumnsFunction(_cbind)); + } else { //rbind + JavaPairRDD<Long,FrameBlock> right = in2.mapToPair( new ReduceSideAppendRowsFunction(leftRows)); + out = in1.union(right); + } + + //put output RDD handle into symbol table + updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind); + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.addLineageRDD(output.getName(), input2.getName()); + } + + /** + * + */ + private static class ReduceSideColumnsFunction implements Function<Tuple2<FrameBlock, FrameBlock>, FrameBlock> + { + private static final long serialVersionUID = -97824903649667646L; + + private boolean _cbind = true; + + public ReduceSideColumnsFunction(boolean cbind) { + _cbind = cbind; + } + + @Override + public FrameBlock call(Tuple2<FrameBlock, FrameBlock> arg0) + throws Exception + { + FrameBlock left = arg0._1(); + FrameBlock right = arg0._2(); + + return left.appendOperations(right, new FrameBlock(), _cbind); + } + } + + /** + * + */ + private static class ReduceSideAppendRowsFunction implements PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> + { + private static final long serialVersionUID = 1723795153048336791L; + + private long _offset; + + public ReduceSideAppendRowsFunction(long offset) { + _offset = offset; + } + + @Override + public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> arg0) + throws Exception + { + return new Tuple2<Long, FrameBlock>(arg0._1()+_offset, arg0._2()); + } + } + + /** + * + */ + private static class ReduceSideAppendAlignFunction implements PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> + { + private static final long serialVersionUID = 5850400295183766409L; + + private long _rows; + + public ReduceSideAppendAlignFunction(long rows) { + _rows = rows; + } + + @Override + public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> arg0) + throws Exception + { + FrameBlock resultBlock = new FrameBlock(arg0._2().getSchema()); + + long index = (arg0._1()/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE)*OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1; + int maxRows = (int) (_rows - index+1 >= OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE?OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE:_rows - index+1); + + resultBlock.ensureAllocatedColumns(maxRows); + resultBlock = resultBlock.leftIndexingOperations(arg0._2(), 0, maxRows-1, 0, arg0._2().getNumColumns()-1, new FrameBlock()); + + return new Tuple2<Long, FrameBlock>(index, resultBlock); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java new file mode 100644 index 0000000..9e557bb --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.PairFlatMapFunction; + +import scala.Tuple2; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; +import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; +import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.matrix.operators.ReorgOperator; + +public class MatrixAppendMSPInstruction extends AppendMSPInstruction +{ + public MatrixAppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr) + { + super(op, in1, in2, offset, out, cbind, opcode, istr); + } + + public static MatrixAppendMSPInstruction parseInstruction ( String str ) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + InstructionUtils.checkNumFields (parts, 5); + + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand offset = new CPOperand(parts[3]); + CPOperand out = new CPOperand(parts[4]); + boolean cbind = Boolean.parseBoolean(parts[5]); + + if(!opcode.equalsIgnoreCase("mappend")) + throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendMSPInstruction: " + str); + + return new MatrixAppendMSPInstruction( + new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), + in1, in2, offset, out, cbind, opcode, str); + } + + @Override + public void processInstruction(ExecutionContext ec) + throws DMLRuntimeException + { + // map-only append (rhs must be vector and fit in mapper mem) + SparkExecutionContext sec = (SparkExecutionContext)ec; + checkBinaryAppendInputCharacteristics(sec, _cbind, false, false); + MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName()); + MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(input2.getName()); + int brlen = mc1.getRowsPerBlock(); + int bclen = mc1.getColsPerBlock(); + + JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); + PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() ); + long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue(); + + //execute map-append operations (partitioning preserving if #in-blocks = #out-blocks) + JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; + if( preservesPartitioning(mc1, mc2, _cbind) ) { + out = in1.mapPartitionsToPair( + new MapSideAppendPartitionFunction(in2, _cbind, off, brlen, bclen), true); + } + else { + out = in1.flatMapToPair( + new MapSideAppendFunction(in2, _cbind, off, brlen, bclen)); + } + + //put output RDD handle into symbol table + updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind); + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.addLineageBroadcast(output.getName(), input2.getName()); + } + + /** + * + * @param mcIn1 + * @param mcIn2 + * @return + */ + private boolean preservesPartitioning( MatrixCharacteristics mcIn1, MatrixCharacteristics mcIn2, boolean cbind ) + { + long ncblksIn1 = cbind ? + (long)Math.ceil((double)mcIn1.getCols()/mcIn1.getColsPerBlock()) : + (long)Math.ceil((double)mcIn1.getRows()/mcIn1.getRowsPerBlock()); + long ncblksOut = cbind ? + (long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getColsPerBlock()) : + (long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getRowsPerBlock()); + + //mappend is partitioning-preserving if in-block append (e.g., common case of colvector append) + return (ncblksIn1 == ncblksOut); + } + + /** + * + */ + private static class MapSideAppendFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> + { + private static final long serialVersionUID = 2738541014432173450L; + + private PartitionedBroadcast<MatrixBlock> _pm = null; + private boolean _cbind = true; + private long _offset; + private int _brlen; + private int _bclen; + private long _lastBlockColIndex; + + public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen) + { + _pm = binput; + _cbind = cbind; + + _offset = offset; + _brlen = brlen; + _bclen = bclen; + + //check for boundary block + int blen = cbind ? bclen : brlen; + _lastBlockColIndex = (long)Math.ceil((double)_offset/blen); + } + + @Override + public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) + throws Exception + { + ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>(); + + IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(kv); + MatrixIndexes ix = in1.getIndexes(); + + //case 1: pass through of non-boundary blocks + if( (_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex ) + { + ret.add( kv ); + } + //case 2: pass through full input block and rhs block + else if( _cbind && in1.getValue().getNumColumns() == _bclen + || !_cbind && in1.getValue().getNumRows() == _brlen) + { + //output lhs block + ret.add( kv ); + + //output shallow copy of rhs block + if( _cbind ) { + ret.add( new Tuple2<MatrixIndexes, MatrixBlock>( + new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1), + _pm.getBlock((int)ix.getRowIndex(), 1)) ); + } + else { //rbind + ret.add( new Tuple2<MatrixIndexes, MatrixBlock>( + new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()), + _pm.getBlock(1, (int)ix.getColumnIndex())) ); + } + } + //case 3: append operation on boundary block + else + { + //allocate space for the output value + ArrayList<IndexedMatrixValue> outlist=new ArrayList<IndexedMatrixValue>(2); + IndexedMatrixValue first = new IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock()); + outlist.add(first); + + MatrixBlock value_in2 = null; + if( _cbind ) { + value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1); + if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) { + IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); + second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1); + outlist.add(second); + } + } + else { //rbind + value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex()); + if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) { + IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); + second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex()); + outlist.add(second); + } + } + + OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, _brlen, _bclen, _cbind, true, 0); + ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist)); + } + + return ret; + } + } + + /** + * + */ + private static class MapSideAppendPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, MatrixBlock> + { + private static final long serialVersionUID = 5767240739761027220L; + + private PartitionedBroadcast<MatrixBlock> _pm = null; + private boolean _cbind = true; + private long _lastBlockColIndex = -1; + + public MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen) + { + _pm = binput; + _cbind = cbind; + + //check for boundary block + int blen = cbind ? bclen : brlen; + _lastBlockColIndex = (long)Math.ceil((double)offset/blen); + } + + @Override + public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0) + throws Exception + { + return new MapAppendPartitionIterator(arg0); + } + + /** + * Lazy mappend iterator to prevent materialization of entire partition output in-memory. + * The implementation via mapPartitions is required to preserve partitioning information, + * which is important for performance. + */ + private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>> + { + public MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) { + super(in); + } + + @Override + protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg) + throws Exception + { + MatrixIndexes ix = arg._1(); + MatrixBlock in1 = arg._2(); + + //case 1: pass through of non-boundary blocks + if( (_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) { + return arg; + } + //case 3: append operation on boundary block + else { + int rowix = _cbind ? (int)ix.getRowIndex() : 1; + int colix = _cbind ? 1 : (int)ix.getColumnIndex(); + MatrixBlock in2 = _pm.getBlock(rowix, colix); + MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind); + return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java new file mode 100644 index 0000000..644fcd2 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; + +import scala.Tuple2; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.matrix.operators.ReorgOperator; + +public class MatrixAppendRSPInstruction extends AppendRSPInstruction +{ + public MatrixAppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr) + { + super(op, in1, in2, out, cbind, opcode, istr); + } + + public static MatrixAppendRSPInstruction parseInstruction ( String str ) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + InstructionUtils.checkNumFields (parts, 4); + + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand out = new CPOperand(parts[3]); + boolean cbind = Boolean.parseBoolean(parts[4]); + + if(!opcode.equalsIgnoreCase("rappend")) + throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str); + + return new MatrixAppendRSPInstruction( + new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), + in1, in2, out, cbind, opcode, str); + } + + @Override + public void processInstruction(ExecutionContext ec) + throws DMLRuntimeException + { + // reduce-only append (output must have at most one column block) + SparkExecutionContext sec = (SparkExecutionContext)ec; + checkBinaryAppendInputCharacteristics(sec, _cbind, true, false); + + JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); + JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() ); + + //execute reduce-append operations (partitioning preserving) + JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1 + .join(in2) + .mapValues(new ReduceSideAppendFunction(_cbind)); + + //put output RDD handle into symbol table + updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind); + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.addLineageRDD(output.getName(), input2.getName()); + } + + /** + * + */ + private static class ReduceSideAppendFunction implements Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> + { + private static final long serialVersionUID = -6763904972560309095L; + + private boolean _cbind = true; + + public ReduceSideAppendFunction(boolean cbind) { + _cbind = cbind; + } + + @Override + public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0) + throws Exception + { + MatrixBlock left = arg0._1(); + MatrixBlock right = arg0._2(); + + return left.appendOperations(right, new MatrixBlock(), _cbind); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 64bc6fe..a4f826c 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -777,6 +777,8 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable ret._schema.addAll(that._schema); ret._colnames = new ArrayList<String>(_colnames); ret._colnames.addAll(that._colnames); + ret._colmeta = new ArrayList<ColumnMetadata>(_colmeta); + ret._colmeta.addAll(that._colmeta); //concatenate column data (w/ deep copy to prevent side effects) for( Array tmp : _coldata ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index 6bce4ff..fa17fcd 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -23,7 +23,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData; @@ -596,4 +599,20 @@ public class UtilFunctions return Arrays.asList(schema); } + + + /* + * This function will return datatype, if its Matrix or Frame + * + * @param str + * Instruction string to execute + */ + + public static DataType getDataType(String str, int index) + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + CPOperand in1 = new CPOperand(parts[index]); + + return in1.getDataType(); + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java new file mode 100644 index 0000000..20c4a27 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.frame; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.hops.BinaryOp; +import org.apache.sysml.hops.BinaryOp.AppendMethod; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +public class FrameAppendDistTest extends AutomatedTestBase +{ + private final static String TEST_NAME = "FrameAppend"; + private final static String TEST_DIR = "functions/frame/"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameAppendDistTest.class.getSimpleName() + "/"; + + private final static double epsilon=0.0000000001; + private final static int min=1; + private final static int max=100; + + private final static int rows1 = 1692; + private final static int rows2 = 1192; + //usecase a: inblock single + private final static int cols1a = 375; + private final static int cols2a = 92; + //usecase b: inblock multiple + private final static int cols1b = 1059; + //usecase c: outblock blocksize + private final static int cols1d = 1460; + private final static int cols3d = 990; + + + private final static double sparsity1 = 0.5; + private final static double sparsity2 = 0.01; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, + new String[] {"C"})); + } + + @Test + public void testAppendInBlock1DenseSP() { + commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols2a, false, AppendMethod.MR_RAPPEND, false); + } + + @Test + public void testAppendInBlock1SparseSP() { + commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols2a, true, AppendMethod.MR_RAPPEND, false); + } + + @Test + public void testAppendInBlock1DenseRBindSP() { + commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows2, cols1a, cols1a, false, AppendMethod.MR_RAPPEND, true); + } + + @Test + public void testAppendInBlock1SparseRBindSP() { + commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols1a, true, AppendMethod.MR_RAPPEND, true); + } + + //NOTE: mappend only applied for m2_cols<=blocksize + @Test + public void testMapAppendInBlock2DenseSP() { + commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1b, cols2a, false, AppendMethod.MR_MAPPEND, false); + } + + @Test + public void testMapAppendInBlock2SparseSP() { + commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1b, cols2a, true, AppendMethod.MR_MAPPEND, false); + } + + @Test + public void testMapAppendOutBlock2DenseSP() { + commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1d, cols3d, false, AppendMethod.MR_MAPPEND, false); + } + + @Test + public void testMapAppendOutBlock2SparseSP() { + commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1d, cols3d, true, AppendMethod.MR_MAPPEND, false); + } + + /** + * + * @param platform + * @param rows + * @param cols1 + * @param cols2 + * @param sparse + */ + public void commonAppendTest(RUNTIME_PLATFORM platform, int rows1, int rows2, int cols1, int cols2, boolean sparse, AppendMethod forcedAppendMethod, boolean rbind) + { + TestConfiguration config = getAndLoadTestConfiguration(TEST_NAME); + + RUNTIME_PLATFORM prevPlfm=rtplatform; + + double sparsity = (sparse) ? sparsity2 : sparsity1; + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + + try + { + if(forcedAppendMethod != null) { + BinaryOp.FORCED_APPEND_METHOD = forcedAppendMethod; + } + rtplatform = platform; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + config.addVariable("rows", rows1); + config.addVariable("cols", cols1); + + /* This is for running the junit test the new way, i.e., construct the arguments directly */ + String RI_HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = RI_HOME + TEST_NAME + ".dml"; + programArgs = new String[]{"-explain","-args", input("A"), + Long.toString(rows1), + Long.toString(cols1), + input("B"), + Long.toString(rows2), + Long.toString(cols2), + output("C"), + (rbind? "rbind": "cbind")}; + fullRScriptName = RI_HOME + TEST_NAME + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + + inputDir() + " " + expectedDir() + " " + (rbind? "rbind": "cbind"); + + //initialize the frame data. + List<ValueType> lschemaA = Arrays.asList(genMixSchema(cols1)); + double[][] A = getRandomMatrix(rows1, cols1, min, max, sparsity, 1111 /*\\System.currentTimeMillis()*/); + writeInputFrameWithMTD("A", A, true, lschemaA, OutputInfo.BinaryBlockOutputInfo); + + List<ValueType> lschemaB = Arrays.asList(genMixSchema(cols2)); + double[][] B = getRandomMatrix(rows2, cols2, min, max, sparsity, 2345 /*\\System.currentTimeMillis()*/); + writeInputFrameWithMTD("B", B, true, lschemaB, OutputInfo.BinaryBlockOutputInfo); + + boolean exceptionExpected = false; + int expectedNumberOfJobs = -1; + runTest(true, exceptionExpected, null, expectedNumberOfJobs); + runRScript(true); + + List<ValueType> lschemaAB = new ArrayList<ValueType>(lschemaA); + lschemaAB.addAll(lschemaB); + + for(String file: config.getOutputFiles()) + { + FrameBlock frameBlock = readDMLFrameFromHDFS(file, InputInfo.BinaryBlockInputInfo); + MatrixCharacteristics md = new MatrixCharacteristics(frameBlock.getNumRows(), frameBlock.getNumColumns(), -1, -1); + FrameBlock frameRBlock = readRFrameFromHDFS(file+".csv", InputInfo.CSVInputInfo, md); + verifyFrameData(frameBlock, frameRBlock, (ValueType[]) lschemaAB.toArray(new ValueType[0])); + System.out.println("File processed is " + file); + } + } + catch(Exception ex) { + ex.printStackTrace(); + throw new RuntimeException(ex); + } + finally + { + //reset execution platform + rtplatform = prevPlfm; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + BinaryOp.FORCED_APPEND_METHOD = null; + } + } + + ValueType[] genMixSchema(int cols) + { + List<ValueType> schemaMixedLargeListStr = Collections.nCopies(cols/4, ValueType.STRING); + List<ValueType> schemaMixedLargeListDble = Collections.nCopies(cols/4, ValueType.DOUBLE); + List<ValueType> schemaMixedLargeListInt = Collections.nCopies(cols/4, ValueType.INT); + List<ValueType> schemaMixedLargeListBool = Collections.nCopies(cols-(cols/4)*3, ValueType.BOOLEAN); + + final List<ValueType> schemaMixedLargeList = new ArrayList<ValueType>(schemaMixedLargeListStr); + schemaMixedLargeList.addAll(schemaMixedLargeListDble); + schemaMixedLargeList.addAll(schemaMixedLargeListInt); + schemaMixedLargeList.addAll(schemaMixedLargeListBool); + ValueType[] schemaMixedLarge = new ValueType[schemaMixedLargeList.size()]; + schemaMixedLarge = (ValueType[]) schemaMixedLargeList.toArray(schemaMixedLarge); + + return schemaMixedLarge; + } + + private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) { + for ( int i=0; i<frame1.getNumRows(); ++i ) + for( int j=0; j<frame1.getNumColumns(); j++ ) { + Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j))); + Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j))); + if( TestUtils.compareToR(schema[j], val1, val2, epsilon) != 0) + Assert.fail("The DML data for cell ("+ i + "," + j + ") is " + val1 + + ", not same as the R value " + val2); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/scripts/functions/frame/FrameAppend.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/frame/FrameAppend.R b/src/test/scripts/functions/frame/FrameAppend.R new file mode 100644 index 0000000..f97916d --- /dev/null +++ b/src/test/scripts/functions/frame/FrameAppend.R @@ -0,0 +1,33 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +args <- commandArgs(TRUE) +options(digits=22) +library("Matrix") + +A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE, stringsAsFactors=FALSE) +B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE, stringsAsFactors=FALSE) +if(args[3] == "rbind") { + C=rbind(A, B) +} else { + C=cbind2(A, B) +} +write.csv(C, paste(args[2], "C.csv", sep=""), row.names = FALSE, quote = FALSE) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/scripts/functions/frame/FrameAppend.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/frame/FrameAppend.dml b/src/test/scripts/functions/frame/FrameAppend.dml new file mode 100644 index 0000000..eea118e --- /dev/null +++ b/src/test/scripts/functions/frame/FrameAppend.dml @@ -0,0 +1,29 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A=read($1, data_type="frame", rows=$2, cols=$3, format="binary") +B=read($4, data_type="frame", rows=$5, cols=$6, format="binary") +if ($8 == "rbind") { + C=rbind(A, B) +} else { + C=cbind(A, B) +} +write(C, $7, format="binary")
