[SYSTEMML-928] Fix spark append compilation for frames (only m/rappend) For the exec type spark, we support mappend, rappend, galignedappend, and gappend for matrices but only mappend and rappend for frames. This patch fixes the related operator selection aware of this restriction. Furthermore, this also includes various minor runtime cleanups of append instructions and frame converters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8d5f3cea Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8d5f3cea Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8d5f3cea Branch: refs/heads/master Commit: 8d5f3ceab60beb225f2f62882b772968c1ef29c5 Parents: 8631a14 Author: Matthias Boehm <mbo...@us.ibm.com> Authored: Fri Sep 16 23:17:40 2016 -0700 Committer: Matthias Boehm <mbo...@us.ibm.com> Committed: Sat Sep 17 00:25:28 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/BinaryOp.java | 7 ++- .../instructions/SPInstructionParser.java | 26 ++++------- .../spark/AppendMSPInstruction.java | 30 +++++++++++++ .../spark/AppendRSPInstruction.java | 30 +++++++++++++ .../spark/FrameAppendMSPInstruction.java | 24 ----------- .../spark/FrameAppendRSPInstruction.java | 23 ---------- .../spark/MatrixAppendMSPInstruction.java | 24 ----------- .../spark/MatrixAppendRSPInstruction.java | 23 ---------- .../spark/utils/FrameRDDConverterUtils.java | 45 +++++++++----------- 9 files changed, 94 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/hops/BinaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java b/src/main/java/org/apache/sysml/hops/BinaryOp.java index edc327d..ab0315b 100644 --- a/src/main/java/org/apache/sysml/hops/BinaryOp.java +++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java @@ -1302,12 +1302,15 @@ public class BinaryOp extends Hop if( cbind && m1_dim2 >= 1 && m2_dim2 >= 0 //column dims known && m1_dim2+m2_dim2 <= m1_cpb //output has one column block ||!cbind && m1_dim1 >= 1 && m2_dim1 >= 0 //row dims known - && m1_dim1+m2_dim1 <= m1_rpb ) //output has one column block + && m1_dim1+m2_dim1 <= m1_rpb //output has one column block + || dt == DataType.FRAME ) { return AppendMethod.MR_RAPPEND; } - // if(mc1.getCols() % mc1.getColsPerBlock() == 0) { + //note: below append methods are only supported for matrix, not frame + + //special case of block-aligned append line if( cbind && m1_dim2 % m1_cpb == 0 || !cbind && m1_dim1 % m1_rpb == 0 ) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java index c74b44e..a244288 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java @@ -34,12 +34,13 @@ import org.apache.sysml.lops.WeightedSquaredLoss; import org.apache.sysml.lops.WeightedSquaredLossR; import org.apache.sysml.lops.WeightedUnaryMM; import org.apache.sysml.lops.WeightedUnaryMMR; -import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.spark.AggregateTernarySPInstruction; import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction; import org.apache.sysml.runtime.instructions.spark.AppendGAlignedSPInstruction; import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction; +import org.apache.sysml.runtime.instructions.spark.AppendMSPInstruction; +import org.apache.sysml.runtime.instructions.spark.AppendRSPInstruction; import org.apache.sysml.runtime.instructions.spark.ArithmeticBinarySPInstruction; import org.apache.sysml.runtime.instructions.spark.BinUaggChainSPInstruction; import org.apache.sysml.runtime.instructions.spark.BuiltinBinarySPInstruction; @@ -53,13 +54,9 @@ import org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction; import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction; import org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction; import org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction; -import org.apache.sysml.runtime.instructions.spark.FrameAppendMSPInstruction; -import org.apache.sysml.runtime.instructions.spark.FrameAppendRSPInstruction; import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction; import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction; import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction; -import org.apache.sysml.runtime.instructions.spark.MatrixAppendMSPInstruction; -import org.apache.sysml.runtime.instructions.spark.MatrixAppendRSPInstruction; import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction; import org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction; import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction; @@ -81,7 +78,6 @@ import org.apache.sysml.runtime.instructions.spark.QuantileSortSPInstruction; import org.apache.sysml.runtime.instructions.spark.UaggOuterChainSPInstruction; import org.apache.sysml.runtime.instructions.spark.WriteSPInstruction; import org.apache.sysml.runtime.instructions.spark.ZipmmSPInstruction; -import org.apache.sysml.runtime.util.UtilFunctions; public class SPInstructionParser extends InstructionParser @@ -392,24 +388,18 @@ public class SPInstructionParser extends InstructionParser case MatrixReshape: return MatrixReshapeSPInstruction.parseInstruction(str); - case MAppend: - if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX) - return MatrixAppendMSPInstruction.parseInstruction(str); - else - return FrameAppendMSPInstruction.parseInstruction(str); + case MAppend: //matrix/frame + return AppendMSPInstruction.parseInstruction(str); + + case RAppend: //matrix/frame + return AppendRSPInstruction.parseInstruction(str); - case GAppend: + case GAppend: return AppendGSPInstruction.parseInstruction(str); case GAlignedAppend: return AppendGAlignedSPInstruction.parseInstruction(str); - case RAppend: - if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX) - return MatrixAppendRSPInstruction.parseInstruction(str); - else - return FrameAppendRSPInstruction.parseInstruction(str); - case Rand: return RandSPInstruction.parseInstruction(str); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java index eaf23d5..ab70af2 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java @@ -20,8 +20,12 @@ package org.apache.sysml.runtime.instructions.spark; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; +import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.matrix.operators.ReorgOperator; public abstract class AppendMSPInstruction extends BinarySPInstruction { @@ -36,4 +40,30 @@ public abstract class AppendMSPInstruction extends BinarySPInstruction _cbind = cbind; } + public static AppendMSPInstruction parseInstruction( String str ) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + InstructionUtils.checkNumFields (parts, 5); + + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand offset = new CPOperand(parts[3]); + CPOperand out = new CPOperand(parts[4]); + boolean cbind = Boolean.parseBoolean(parts[5]); + + if(!opcode.equalsIgnoreCase("mappend")) + throw new DMLRuntimeException("Unknown opcode while parsing a AppendMSPInstruction: " + str); + + //construct matrix/frame appendm instruction + if( in1.getDataType().isMatrix() ) { + return new MatrixAppendMSPInstruction(new ReorgOperator(OffsetColumnIndex + .getOffsetColumnIndexFnObject(-1)), in1, in2, offset, out, cbind, opcode, str); + } + else { //frame + return new FrameAppendMSPInstruction(new ReorgOperator(OffsetColumnIndex + .getOffsetColumnIndexFnObject(-1)), in1, in2, offset, out, cbind, opcode, str); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java index 6d3cf5e..b56b7d7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java @@ -19,8 +19,12 @@ package org.apache.sysml.runtime.instructions.spark; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; +import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.matrix.operators.ReorgOperator; public abstract class AppendRSPInstruction extends BinarySPInstruction @@ -33,5 +37,31 @@ public abstract class AppendRSPInstruction extends BinarySPInstruction _sptype = SPINSTRUCTION_TYPE.RAppend; _cbind = cbind; } + + public static AppendRSPInstruction parseInstruction ( String str ) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + InstructionUtils.checkNumFields (parts, 4); + + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand out = new CPOperand(parts[3]); + boolean cbind = Boolean.parseBoolean(parts[4]); + + if(!opcode.equalsIgnoreCase("rappend")) + throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str); + + if( in1.getDataType().isMatrix() ) { + return new MatrixAppendRSPInstruction(new ReorgOperator(OffsetColumnIndex + .getOffsetColumnIndexFnObject(-1)), in1, in2, out, cbind, opcode, str); + } + else { //frame + + return new FrameAppendRSPInstruction(new ReorgOperator(OffsetColumnIndex + .getOffsetColumnIndexFnObject(-1)), in1, in2, out, cbind, opcode, str); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java index b67f364..7aad0bf 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java @@ -30,14 +30,11 @@ import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; -import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.operators.Operator; -import org.apache.sysml.runtime.matrix.operators.ReorgOperator; public class FrameAppendMSPInstruction extends AppendMSPInstruction { @@ -46,27 +43,6 @@ public class FrameAppendMSPInstruction extends AppendMSPInstruction super(op, in1, in2, offset, out, cbind, opcode, istr); } - public static FrameAppendMSPInstruction parseInstruction ( String str ) - throws DMLRuntimeException - { - String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields (parts, 5); - - String opcode = parts[0]; - CPOperand in1 = new CPOperand(parts[1]); - CPOperand in2 = new CPOperand(parts[2]); - CPOperand offset = new CPOperand(parts[3]); - CPOperand out = new CPOperand(parts[4]); - boolean cbind = Boolean.parseBoolean(parts[5]); - - if(!opcode.equalsIgnoreCase("mappend")) - throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendMSPInstruction: " + str); - - return new FrameAppendMSPInstruction( - new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), - in1, in2, offset, out, cbind, opcode, str); - } - @Override public void processInstruction(ExecutionContext ec) throws DMLRuntimeException http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java index c627e40..067769d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java @@ -29,13 +29,10 @@ import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; -import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.operators.Operator; -import org.apache.sysml.runtime.matrix.operators.ReorgOperator; public class FrameAppendRSPInstruction extends AppendRSPInstruction { @@ -43,26 +40,6 @@ public class FrameAppendRSPInstruction extends AppendRSPInstruction { super(op, in1, in2, out, cbind, opcode, istr); } - - public static FrameAppendRSPInstruction parseInstruction ( String str ) - throws DMLRuntimeException - { - String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields (parts, 4); - - String opcode = parts[0]; - CPOperand in1 = new CPOperand(parts[1]); - CPOperand in2 = new CPOperand(parts[2]); - CPOperand out = new CPOperand(parts[3]); - boolean cbind = Boolean.parseBoolean(parts[4]); - - if(!opcode.equalsIgnoreCase("rappend")) - throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendRSPInstruction: " + str); - - return new FrameAppendRSPInstruction( - new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), - in1, in2, out, cbind, opcode, str); - } @Override public void processInstruction(ExecutionContext ec) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java index 9e557bb..c4cd548 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java @@ -30,8 +30,6 @@ import scala.Tuple2; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; -import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator; import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; @@ -42,7 +40,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.operators.Operator; -import org.apache.sysml.runtime.matrix.operators.ReorgOperator; public class MatrixAppendMSPInstruction extends AppendMSPInstruction { @@ -51,27 +48,6 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction super(op, in1, in2, offset, out, cbind, opcode, istr); } - public static MatrixAppendMSPInstruction parseInstruction ( String str ) - throws DMLRuntimeException - { - String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields (parts, 5); - - String opcode = parts[0]; - CPOperand in1 = new CPOperand(parts[1]); - CPOperand in2 = new CPOperand(parts[2]); - CPOperand offset = new CPOperand(parts[3]); - CPOperand out = new CPOperand(parts[4]); - boolean cbind = Boolean.parseBoolean(parts[5]); - - if(!opcode.equalsIgnoreCase("mappend")) - throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendMSPInstruction: " + str); - - return new MatrixAppendMSPInstruction( - new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), - in1, in2, offset, out, cbind, opcode, str); - } - @Override public void processInstruction(ExecutionContext ec) throws DMLRuntimeException http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java index 644fcd2..779e5ab 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java @@ -27,13 +27,10 @@ import scala.Tuple2; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex; -import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.Operator; -import org.apache.sysml.runtime.matrix.operators.ReorgOperator; public class MatrixAppendRSPInstruction extends AppendRSPInstruction { @@ -42,26 +39,6 @@ public class MatrixAppendRSPInstruction extends AppendRSPInstruction super(op, in1, in2, out, cbind, opcode, istr); } - public static MatrixAppendRSPInstruction parseInstruction ( String str ) - throws DMLRuntimeException - { - String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields (parts, 4); - - String opcode = parts[0]; - CPOperand in1 = new CPOperand(parts[1]); - CPOperand in2 = new CPOperand(parts[2]); - CPOperand out = new CPOperand(parts[3]); - boolean cbind = Boolean.parseBoolean(parts[4]); - - if(!opcode.equalsIgnoreCase("rappend")) - throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str); - - return new MatrixAppendRSPInstruction( - new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), - in1, in2, out, cbind, opcode, str); - } - @Override public void processInstruction(ExecutionContext ec) throws DMLRuntimeException http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 3ac1daf..351d559 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -160,7 +160,8 @@ public class FrameRDDConverterUtils * @param strict * @return */ - public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict) + public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, + MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict) { JavaPairRDD<Long,FrameBlock> input = in; @@ -193,16 +194,12 @@ public class FrameRDDConverterUtils JavaPairRDD<LongWritable, Text> in, MatrixCharacteristics mcOut, List<ValueType> schema ) throws DMLRuntimeException { - //replicate schema entry if necessary - List<ValueType> lschema = (schema.size()==1 && mcOut.getCols()>1) ? - Collections.nCopies((int)mcOut.getCols(), schema.get(0)) : schema; - //convert input rdd to serializable long/frame block JavaPairRDD<Long,Text> input = in.mapToPair(new LongWritableTextToLongTextFunction()); //do actual conversion - return textCellToBinaryBlockLongIndex(sc, input, mcOut, lschema); + return textCellToBinaryBlockLongIndex(sc, input, mcOut, schema); } /** @@ -215,12 +212,18 @@ public class FrameRDDConverterUtils * @throws DMLRuntimeException */ public static JavaPairRDD<Long, FrameBlock> textCellToBinaryBlockLongIndex(JavaSparkContext sc, - JavaPairRDD<Long, Text> input, MatrixCharacteristics mcOut, List<ValueType> schema ) + JavaPairRDD<Long, Text> input, MatrixCharacteristics mc, List<ValueType> schema ) throws DMLRuntimeException { + //prepare default schema if needed + if( schema == null || schema.size()==1 ) { + schema = Collections.nCopies((int)mc.getCols(), + (schema!=null) ? schema.get(0) : ValueType.STRING); + } //convert textcell rdd to binary block rdd (w/ partial blocks) - JavaPairRDD<Long, FrameBlock> output = input.values().mapPartitionsToPair(new TextToBinaryBlockFunction( mcOut, schema )); + JavaPairRDD<Long, FrameBlock> output = input.values() + .mapPartitionsToPair(new TextToBinaryBlockFunction( mc, schema )); //aggregate partial matrix blocks JavaPairRDD<Long,FrameBlock> out = @@ -259,14 +262,9 @@ public class FrameRDDConverterUtils JavaPairRDD<MatrixIndexes, MatrixBlock> input, MatrixCharacteristics mcIn) throws DMLRuntimeException { - //Do actual conversion - JavaPairRDD<Long, FrameBlock> output = matrixBlockToBinaryBlockLongIndex(sc,input, mcIn); - - //convert input rdd to serializable LongWritable/frame block - JavaPairRDD<LongWritable,FrameBlock> out = - output.mapToPair(new LongFrameToLongWritableFrameFunction()); - - return out; + //convert and map to serializable LongWritable/frame block + return matrixBlockToBinaryBlockLongIndex(sc,input, mcIn) + .mapToPair(new LongFrameToLongWritableFrameFunction()); } @@ -285,16 +283,17 @@ public class FrameRDDConverterUtils JavaPairRDD<Long, FrameBlock> out = null; if(mcIn.getCols() > mcIn.getColsPerBlock()) { - + //convert matrix binary block to frame binary block out = input.flatMapToPair(new MatrixToBinaryBlockFunction(mcIn)); //aggregate partial frame blocks - if(mcIn.getCols() > mcIn.getColsPerBlock()) - out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( out ); + out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( out ); } - else + else { + //convert single matrix binary block to frame binary block (w/o shuffle) out = input.mapToPair(new MatrixToBinaryBlockOneColumnBlockFunction(mcIn)); - + } + return out; } @@ -725,10 +724,8 @@ public class FrameRDDConverterUtils private void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) throws DMLRuntimeException { - if( fb != null && fb.getNumRows()>0 ) { - fb.setSchema(_schema); //use shared schema + if( fb != null && fb.getNumRows()>0 ) ret.add(new Tuple2<Long,FrameBlock>(ix, fb)); - } } }