Repository: incubator-systemml Updated Branches: refs/heads/master ce84288f0 -> 2b7fdb2b3
[SYSTEMML-812] Runtime integration compressed linear algebra Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/616793d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/616793d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/616793d7 Branch: refs/heads/master Commit: 616793d798f06a26092171b8588e6eb94e6aa21d Parents: ce84288 Author: Matthias Boehm <[email protected]> Authored: Sat Jul 16 20:08:23 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sat Jul 16 20:08:23 2016 -0700 ---------------------------------------------------------------------- .../instructions/CPInstructionParser.java | 6 +- .../instructions/SPInstructionParser.java | 6 ++ .../cp/AggregateBinaryCPInstruction.java | 7 +- .../runtime/instructions/cp/CPInstruction.java | 2 +- .../cp/CompressionCPInstruction.java | 63 +++++++++++++ .../spark/CompressionSPInstruction.java | 97 ++++++++++++++++++++ .../instructions/spark/SPInstruction.java | 2 +- .../functions/CreateSparseBlockFunction.java | 3 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 4 +- .../matrix/data/OperationsOnMatrixValues.java | 6 +- 10 files changed, 188 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java index 2df3615..b6e0c50 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java @@ -37,6 +37,7 @@ import org.apache.sysml.runtime.instructions.cp.BuiltinBinaryCPInstruction; import org.apache.sysml.runtime.instructions.cp.BuiltinUnaryCPInstruction; import org.apache.sysml.runtime.instructions.cp.CPInstruction; import org.apache.sysml.runtime.instructions.cp.CentralMomentCPInstruction; +import org.apache.sysml.runtime.instructions.cp.CompressionCPInstruction; import org.apache.sysml.runtime.instructions.cp.ConvolutionCPInstruction; import org.apache.sysml.runtime.instructions.cp.CovarianceCPInstruction; import org.apache.sysml.runtime.instructions.cp.DataGenCPInstruction; @@ -266,7 +267,7 @@ public class CPInstructionParser extends InstructionParser String2CPInstructionType.put( "eigen", CPINSTRUCTION_TYPE.MultiReturnBuiltin); String2CPInstructionType.put( "partition", CPINSTRUCTION_TYPE.Partition); - + String2CPInstructionType.put( "compress", CPINSTRUCTION_TYPE.Compression); //CP FILE instruction String2CPFileInstructionType = new HashMap<String, CPINSTRUCTION_TYPE>(); @@ -418,6 +419,9 @@ public class CPInstructionParser extends InstructionParser case Partition: return DataPartitionCPInstruction.parseInstruction(str); + case Compression: + return (CPInstruction) CompressionCPInstruction.parseInstruction(str); + case CentralMoment: return CentralMomentCPInstruction.parseInstruction(str); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java index 11e0ea0..e8437fb 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java @@ -22,6 +22,7 @@ package org.apache.sysml.runtime.instructions; import java.util.HashMap; import org.apache.sysml.lops.Checkpoint; +import org.apache.sysml.lops.Compression; import org.apache.sysml.lops.DataGen; import org.apache.sysml.lops.WeightedCrossEntropy; import org.apache.sysml.lops.WeightedCrossEntropyR; @@ -48,6 +49,7 @@ import org.apache.sysml.runtime.instructions.spark.CSVReblockSPInstruction; import org.apache.sysml.runtime.instructions.spark.CastSPInstruction; import org.apache.sysml.runtime.instructions.spark.CentralMomentSPInstruction; import org.apache.sysml.runtime.instructions.spark.CheckpointSPInstruction; +import org.apache.sysml.runtime.instructions.spark.CompressionSPInstruction; import org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction; import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction; import org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction; @@ -183,6 +185,7 @@ public class SPInstructionParser extends InstructionParser // Spark-specific instructions String2SPInstructionType.put( Checkpoint.OPCODE, SPINSTRUCTION_TYPE.Checkpoint); + String2SPInstructionType.put( Compression.OPCODE, SPINSTRUCTION_TYPE.Compression); // Builtin Instruction Opcodes String2SPInstructionType.put( "log" , SPINSTRUCTION_TYPE.Builtin); @@ -427,6 +430,9 @@ public class SPInstructionParser extends InstructionParser case Checkpoint: return CheckpointSPInstruction.parseInstruction(str); + case Compression: + return CompressionSPInstruction.parseInstruction(str); + case Cast: return CastSPInstruction.parseInstruction(str); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateBinaryCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateBinaryCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateBinaryCPInstruction.java index 05a1bc5..374a21b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateBinaryCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateBinaryCPInstruction.java @@ -22,6 +22,7 @@ package org.apache.sysml.runtime.instructions.cp; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.CompressedMatrixBlock; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.functionobjects.Multiply; import org.apache.sysml.runtime.functionobjects.Plus; @@ -80,7 +81,11 @@ public class AggregateBinaryCPInstruction extends BinaryCPInstruction //compute matrix multiplication AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr; - MatrixBlock soresBlock = (MatrixBlock) (matBlock1.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op)); + MatrixBlock soresBlock = null; + if( matBlock2 instanceof CompressedMatrixBlock ) + soresBlock = (MatrixBlock) (matBlock2.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op)); + else + soresBlock = (MatrixBlock) (matBlock1.aggregateBinaryOperations(matBlock1, matBlock2, new MatrixBlock(), ab_op)); //release inputs/outputs ec.releaseMatrixInput(input1.getName()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java index 38e92bb..158e26f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java @@ -30,7 +30,7 @@ import org.apache.sysml.runtime.matrix.operators.Operator; public abstract class CPInstruction extends Instruction { - public enum CPINSTRUCTION_TYPE { INVALID, AggregateUnary, AggregateBinary, AggregateTernary, ArithmeticBinary, Ternary, Quaternary, BooleanBinary, BooleanUnary, BuiltinBinary, BuiltinUnary, MultiReturnParameterizedBuiltin, ParameterizedBuiltin, MultiReturnBuiltin, Builtin, Reorg, RelationalBinary, File, Variable, External, Append, Rand, QSort, QPick, MatrixIndexing, MMTSJ, PMMJ, MMChain, MatrixReshape, Partition, StringInit, CentralMoment, Covariance, UaggOuterChain, Convolution }; + public enum CPINSTRUCTION_TYPE { INVALID, AggregateUnary, AggregateBinary, AggregateTernary, ArithmeticBinary, Ternary, Quaternary, BooleanBinary, BooleanUnary, BuiltinBinary, BuiltinUnary, MultiReturnParameterizedBuiltin, ParameterizedBuiltin, MultiReturnBuiltin, Builtin, Reorg, RelationalBinary, File, Variable, External, Append, Rand, QSort, QPick, MatrixIndexing, MMTSJ, PMMJ, MMChain, MatrixReshape, Partition, Compression, StringInit, CentralMoment, Covariance, UaggOuterChain, Convolution }; protected CPINSTRUCTION_TYPE _cptype; protected Operator _optr; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java new file mode 100644 index 0000000..333bfbb --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.cp; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.CompressedMatrixBlock; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.instructions.Instruction; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.Operator; + +public class CompressionCPInstruction extends UnaryCPInstruction +{ + + public CompressionCPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String istr){ + super(op, in, null, null, out, opcode, istr); + } + + public static Instruction parseInstruction(String str) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand out = new CPOperand(parts[2]); + + return new CompressionCPInstruction(null, in1, out, opcode, str); + } + + @Override + public void processInstruction( ExecutionContext ec ) + throws DMLRuntimeException + { + //get matrix block input + MatrixBlock in = ec.getMatrixInput(input1.getName()); + + //compress the matrix block + CompressedMatrixBlock cmb = new CompressedMatrixBlock(in); + cmb.compress(); + + //set output and release input + ec.releaseMatrixInput(input1.getName()); + ec.setMatrixOutput(output.getName(), cmb); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java new file mode 100644 index 0000000..5d61754 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CompressionSPInstruction.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.CompressedMatrixBlock; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.operators.Operator; + + +public class CompressionSPInstruction extends UnarySPInstruction +{ + public CompressionSPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String istr){ + super(op, in, out, opcode, istr); + _sptype = SPINSTRUCTION_TYPE.Reorg; + } + + /** + * + * @param str + * @return + * @throws DMLRuntimeException + */ + public static CompressionSPInstruction parseInstruction ( String str ) + throws DMLRuntimeException + { + InstructionUtils.checkNumFields(str, 2); + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + + String opcode = parts[0]; + CPOperand in = new CPOperand(parts[1]); + CPOperand out = new CPOperand(parts[2]); + + return new CompressionSPInstruction(null, in, out, opcode, str); + } + + @Override + public void processInstruction(ExecutionContext ec) + throws DMLRuntimeException + { + SparkExecutionContext sec = (SparkExecutionContext)ec; + + //get input rdd handle + JavaPairRDD<MatrixIndexes,MatrixBlock> in = + sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); + + //execute compression + JavaPairRDD<MatrixIndexes,MatrixBlock> out = + in.mapValues(new CompressionFunction()); + + //set outputs + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(input1.getName(), output.getName()); + } + + /** + * + */ + public static class CompressionFunction implements Function<MatrixBlock,MatrixBlock> + { + private static final long serialVersionUID = -6528833083609423922L; + + @Override + public MatrixBlock call(MatrixBlock arg0) + throws Exception + { + CompressedMatrixBlock cmb = new CompressedMatrixBlock(arg0); + cmb.compress(); + + return cmb; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java index d9c49aa..0c0d3f0 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java @@ -40,7 +40,7 @@ public abstract class SPInstruction extends Instruction public enum SPINSTRUCTION_TYPE { MAPMM, MAPMMCHAIN, CPMM, RMM, TSMM, PMM, ZIPMM, PMAPMM, //matrix multiplication instructions MatrixIndexing, Reorg, ArithmeticBinary, RelationalBinary, AggregateUnary, AggregateTernary, Reblock, CSVReblock, - Builtin, BuiltinUnary, BuiltinBinary, MultiReturnBuiltin, Checkpoint, Cast, + Builtin, BuiltinUnary, BuiltinBinary, MultiReturnBuiltin, Checkpoint, Compression, Cast, CentralMoment, Covariance, QSort, QPick, ParameterizedBuiltin, MAppend, RAppend, GAppend, GAlignedAppend, Rand, MatrixReshape, Ternary, Quaternary, CumsumAggregate, CumsumOffset, BinUaggChain, UaggOuterChain, http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java index 7cf6e8c..93f91ec 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java @@ -19,6 +19,7 @@ package org.apache.sysml.runtime.instructions.spark.functions; import org.apache.spark.api.java.function.Function; +import org.apache.sysml.runtime.compress.CompressedMatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.SparseBlock; @@ -44,7 +45,7 @@ public class CreateSparseBlockFunction implements Function<MatrixBlock,MatrixBlo { //convert given block to CSR representation if in sparse format //but allow shallow pass-through if already in CSR representation. - if( arg0.isInSparseFormat() ) + if( arg0.isInSparseFormat() && !(arg0 instanceof CompressedMatrixBlock) ) return new MatrixBlock(arg0, _stype, false); else //pass through dense return arg0; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 68cb43a..a7654cc 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -6139,8 +6139,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @return */ public boolean isThreadSafe() { - return !sparse || (sparseBlock != null) ? sparseBlock.isThreadSafe() : - DEFAULT_SPARSEBLOCK == SparseBlock.Type.MCSR; //only MCSR thread-safe + return !sparse || ((sparseBlock != null) ? sparseBlock.isThreadSafe() : + DEFAULT_SPARSEBLOCK == SparseBlock.Type.MCSR); //only MCSR thread-safe } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/616793d7/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java index 8ea1ddd..219a1e7 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.compress.CompressedMatrixBlock; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.functionobjects.Builtin; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; @@ -284,7 +285,10 @@ public class OperationsOnMatrixValues indexesOut.setIndexes(indexes1.getRowIndex(), indexes2.getColumnIndex()); //perform on the value - value1.aggregateBinaryOperations(indexes1, value1, indexes2, value2, valueOut, op); + if( value2 instanceof CompressedMatrixBlock ) + value2.aggregateBinaryOperations(value1, value2, valueOut, op); + else //default + value1.aggregateBinaryOperations(indexes1, value1, indexes2, value2, valueOut, op); } public static void performAggregateBinaryIgnoreIndexes(
