http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/api/MLMatrix.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java b/src/main/java/org/apache/sysml/api/MLMatrix.java deleted file mode 100644 index 45f631f..0000000 --- a/src/main/java/org/apache/sysml/api/MLMatrix.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sysml.api; - -import java.io.IOException; -import java.util.List; - -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.encoders.RowEncoder; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.execution.QueryExecution; -import org.apache.spark.sql.types.StructType; -import org.apache.sysml.hops.OptimizerUtils; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.instructions.spark.functions.GetMIMBFromRow; -import org.apache.sysml.runtime.instructions.spark.functions.GetMLBlock; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; - -import scala.Tuple2; - -/** - * Experimental API: Might be discontinued in future release - * - * This class serves four purposes: - * 1. It allows SystemML to fit nicely in MLPipeline by reducing number of reblocks. - * 2. It allows users to easily read and write matrices without worrying - * too much about format, metadata and type of underlying RDDs. - * 3. It provides mechanism to convert to and from MLLib's BlockedMatrix format - * 4. It provides off-the-shelf library for Distributed Blocked Matrix and reduces learning curve for using SystemML. - * However, it is important to know that it is easy to abuse this off-the-shelf library and think it as replacement - * to writing DML, which it is not. It does not provide any optimization between calls. A simple example - * of the optimization that is conveniently skipped is: (t(m) %*% m)). - * Also, note that this library is not thread-safe. The operator precedence is not exactly same as DML (as the precedence is - * enforced by scala compiler), so please use appropriate brackets to enforce precedence. - - import org.apache.sysml.api.{MLContext, MLMatrix} - val ml = new MLContext(sc) - val mat1 = ml.read(sparkSession, "V_small.csv", "csv") - val mat2 = ml.read(sparkSession, "W_small.mtx", "binary") - val result = mat1.transpose() %*% mat2 - result.write("Result_small.mtx", "text") - - * @deprecated This will be removed in SystemML 1.0. Please migrate to {@link org.apache.sysml.api.mlcontext.MLContext} - */ -@Deprecated -public class MLMatrix extends Dataset<Row> { - private static final long serialVersionUID = -7005940673916671165L; - - protected MatrixCharacteristics mc = null; - protected MLContext ml = null; - - protected MLMatrix(SparkSession sparkSession, LogicalPlan logicalPlan, MLContext ml) { - super(sparkSession, logicalPlan, RowEncoder.apply(null)); - this.ml = ml; - } - - protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, MLContext ml) { - super(sqlContext, logicalPlan, RowEncoder.apply(null)); - this.ml = ml; - } - - protected MLMatrix(SparkSession sparkSession, QueryExecution queryExecution, MLContext ml) { - super(sparkSession, queryExecution, RowEncoder.apply(null)); - this.ml = ml; - } - - protected MLMatrix(SQLContext sqlContext, QueryExecution queryExecution, MLContext ml) { - super(sqlContext.sparkSession(), queryExecution, RowEncoder.apply(null)); - this.ml = ml; - } - - // Only used internally to set a new MLMatrix after one of matrix operations. - // Not to be used externally. - protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext ml) throws DMLRuntimeException { - super(df.sparkSession(), df.logicalPlan(), RowEncoder.apply(null)); - this.mc = mc; - this.ml = ml; - } - - //TODO replace default blocksize - static String writeStmt = "write(output, \"tmp\", format=\"binary\", rows_in_block=" + OptimizerUtils.DEFAULT_BLOCKSIZE + ", cols_in_block=" + OptimizerUtils.DEFAULT_BLOCKSIZE + ");"; - - // ------------------------------------------------------------------------------------------------ - -// /** -// * Experimental unstable API: Converts our blocked matrix format to MLLib's format -// * @return -// */ -// public BlockMatrix toBlockedMatrix() { -// JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = getRDDLazily(this); -// RDD<Tuple2<Tuple2<Object, Object>, Matrix>> mllibBlocks = blocks.mapToPair(new GetMLLibBlocks(mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock())).rdd(); -// return new BlockMatrix(mllibBlocks, mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getRows(), mc.getCols()); -// } - - // ------------------------------------------------------------------------------------------------ - static MLMatrix createMLMatrix(MLContext ml, SparkSession sparkSession, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException { - RDD<Row> rows = blocks.map(new GetMLBlock()).rdd(); - StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); - return new MLMatrix(sparkSession.createDataFrame(rows.toJavaRDD(), schema), mc, ml); - } - - static MLMatrix createMLMatrix(MLContext ml, SQLContext sqlContext, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException { - SparkSession sparkSession = sqlContext.sparkSession(); - return createMLMatrix(ml, sparkSession, blocks, mc); - } - - /** - * Convenient method to write a MLMatrix. - * - * @param filePath the file path - * @param format the format - * @throws IOException if IOException occurs - * @throws DMLException if DMLException occurs - */ - public void write(String filePath, String format) throws IOException, DMLException { - ml.reset(); - ml.registerInput("left", this); - ml.executeScript("left = read(\"\"); output=left; write(output, \"" + filePath + "\", format=\"" + format + "\");"); - } - - private double getScalarBuiltinFunctionResult(String fn) throws IOException, DMLException { - if(fn.equals("nrow") || fn.equals("ncol")) { - ml.reset(); - ml.registerInput("left", getRDDLazily(this), mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros()); - ml.registerOutput("output"); - String script = "left = read(\"\");" - + "val = " + fn + "(left); " - + "output = matrix(val, rows=1, cols=1); " - + writeStmt; - MLOutput out = ml.executeScript(script); - List<Tuple2<MatrixIndexes, MatrixBlock>> result = out.getBinaryBlockedRDD("output").collect(); - if(result == null || result.size() != 1) { - throw new DMLRuntimeException("Error while computing the function: " + fn); - } - return result.get(0)._2.getValue(0, 0); - } - else { - throw new DMLRuntimeException("The function " + fn + " is not yet supported in MLMatrix"); - } - } - - /** - * Gets or computes the number of rows. - * @return the number of rows - * @throws IOException if IOException occurs - * @throws DMLException if DMLException occurs - */ - public long numRows() throws IOException, DMLException { - if(mc.rowsKnown()) { - return mc.getRows(); - } - else { - return (long) getScalarBuiltinFunctionResult("nrow"); - } - } - - /** - * Gets or computes the number of columns. - * @return the number of columns - * @throws IOException if IOException occurs - * @throws DMLException if DMLException occurs - */ - public long numCols() throws IOException, DMLException { - if(mc.colsKnown()) { - return mc.getCols(); - } - else { - return (long) getScalarBuiltinFunctionResult("ncol"); - } - } - - public int rowsPerBlock() { - return mc.getRowsPerBlock(); - } - - public int colsPerBlock() { - return mc.getColsPerBlock(); - } - - private String getScript(String binaryOperator) { - return "left = read(\"\");" - + "right = read(\"\");" - + "output = left " + binaryOperator + " right; " - + writeStmt; - } - - private String getScalarBinaryScript(String binaryOperator, double scalar, boolean isScalarLeft) { - if(isScalarLeft) { - return "left = read(\"\");" - + "output = " + scalar + " " + binaryOperator + " left ;" - + writeStmt; - } - else { - return "left = read(\"\");" - + "output = left " + binaryOperator + " " + scalar + ";" - + writeStmt; - } - } - - static JavaPairRDD<MatrixIndexes, MatrixBlock> getRDDLazily(MLMatrix mat) { - return mat.rdd().toJavaRDD().mapToPair(new GetMIMBFromRow()); - } - - private MLMatrix matrixBinaryOp(MLMatrix that, String op) throws IOException, DMLException { - - if(mc.getRowsPerBlock() != that.mc.getRowsPerBlock() || mc.getColsPerBlock() != that.mc.getColsPerBlock()) { - throw new DMLRuntimeException("Incompatible block sizes: brlen:" + mc.getRowsPerBlock() + "!=" + that.mc.getRowsPerBlock() + " || bclen:" + mc.getColsPerBlock() + "!=" + that.mc.getColsPerBlock()); - } - - if(op.equals("%*%")) { - if(mc.getCols() != that.mc.getRows()) { - throw new DMLRuntimeException("Dimensions mismatch:" + mc.getCols() + "!=" + that.mc.getRows()); - } - } - else { - if(mc.getRows() != that.mc.getRows() || mc.getCols() != that.mc.getCols()) { - throw new DMLRuntimeException("Dimensions mismatch:" + mc.getRows() + "!=" + that.mc.getRows() + " || " + mc.getCols() + "!=" + that.mc.getCols()); - } - } - - ml.reset(); - ml.registerInput("left", this); - ml.registerInput("right", that); - ml.registerOutput("output"); - MLOutput out = ml.executeScript(getScript(op)); - RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd(); - StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); - MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); - return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); - } - - private MLMatrix scalarBinaryOp(Double scalar, String op, boolean isScalarLeft) throws IOException, DMLException { - ml.reset(); - ml.registerInput("left", this); - ml.registerOutput("output"); - MLOutput out = ml.executeScript(getScalarBinaryScript(op, scalar, isScalarLeft)); - RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd(); - StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); - MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); - return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); - } - - // --------------------------------------------------- - // Simple operator loading but doesnot utilize the optimizer - - public MLMatrix $greater(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, ">"); - } - - public MLMatrix $less(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "<"); - } - - public MLMatrix $greater$eq(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, ">="); - } - - public MLMatrix $less$eq(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "<="); - } - - public MLMatrix $eq$eq(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "=="); - } - - public MLMatrix $bang$eq(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "!="); - } - - public MLMatrix $up(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "^"); - } - - public MLMatrix exp(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "^"); - } - - public MLMatrix $plus(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "+"); - } - - public MLMatrix add(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "+"); - } - - public MLMatrix $minus(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "-"); - } - - public MLMatrix minus(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "-"); - } - - public MLMatrix $times(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "*"); - } - - public MLMatrix elementWiseMultiply(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "*"); - } - - public MLMatrix $div(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "/"); - } - - public MLMatrix divide(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "/"); - } - - public MLMatrix $percent$div$percent(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "%/%"); - } - - public MLMatrix integerDivision(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "%/%"); - } - - public MLMatrix $percent$percent(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "%%"); - } - - public MLMatrix modulus(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "%%"); - } - - public MLMatrix $percent$times$percent(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "%*%"); - } - - public MLMatrix multiply(MLMatrix that) throws IOException, DMLException { - return matrixBinaryOp(that, "%*%"); - } - - public MLMatrix transpose() throws IOException, DMLException { - ml.reset(); - ml.registerInput("left", this); - ml.registerOutput("output"); - String script = "left = read(\"\");" - + "output = t(left); " - + writeStmt; - MLOutput out = ml.executeScript(script); - RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd(); - StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); - MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); - return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); - } - - // TODO: For 'scalar op matrix' operations: Do implicit conversions - public MLMatrix $plus(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "+", false); - } - - public MLMatrix add(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "+", false); - } - - public MLMatrix $minus(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "-", false); - } - - public MLMatrix minus(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "-", false); - } - - public MLMatrix $times(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "*", false); - } - - public MLMatrix elementWiseMultiply(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "*", false); - } - - public MLMatrix $div(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "/", false); - } - - public MLMatrix divide(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "/", false); - } - - public MLMatrix $greater(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, ">", false); - } - - public MLMatrix $less(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "<", false); - } - - public MLMatrix $greater$eq(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, ">=", false); - } - - public MLMatrix $less$eq(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "<=", false); - } - - public MLMatrix $eq$eq(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "==", false); - } - - public MLMatrix $bang$eq(Double scalar) throws IOException, DMLException { - return scalarBinaryOp(scalar, "!=", false); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/api/MLOutput.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java deleted file mode 100644 index a16eccd..0000000 --- a/src/main/java/org/apache/sysml/api/MLOutput.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.api; - -import java.util.Map; - -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.StructType; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.instructions.spark.functions.GetMLBlock; -import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; -import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FrameBlock; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; - -/** - * This is a simple container object that returns the output of execute from MLContext - * - * @deprecated This will be removed in SystemML 1.0. Please migrate to {@link org.apache.sysml.api.mlcontext.MLContext} - * and {@link org.apache.sysml.api.mlcontext.MLResults} - */ -@Deprecated -public class MLOutput { - - Map<String, JavaPairRDD<?,?>> _outputs; - private Map<String, MatrixCharacteristics> _outMetadata = null; - - public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, MatrixCharacteristics> outMetadata) { - this._outputs = outputs; - this._outMetadata = outMetadata; - } - - public MatrixBlock getMatrixBlock(String varName) throws DMLRuntimeException { - MatrixCharacteristics mc = getMatrixCharacteristics(varName); - // The matrix block is always pushed to an RDD and then we do collect - // We can later avoid this by returning symbol table rather than "Map<String, JavaPairRDD<MatrixIndexes,MatrixBlock>> _outputs" - return SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) mc.getRows(), (int) mc.getCols(), - mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros()); - } - - @SuppressWarnings("unchecked") - public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockedRDD(String varName) throws DMLRuntimeException { - if(_outputs.containsKey(varName)) { - return (JavaPairRDD<MatrixIndexes,MatrixBlock>) _outputs.get(varName); - } - throw new DMLRuntimeException("Variable " + varName + " not found in the outputs."); - } - - @SuppressWarnings("unchecked") - public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockedRDD(String varName) throws DMLRuntimeException { - if(_outputs.containsKey(varName)) { - return (JavaPairRDD<Long,FrameBlock>)_outputs.get(varName); - } - throw new DMLRuntimeException("Variable " + varName + " not found in the outputs."); - } - - public MatrixCharacteristics getMatrixCharacteristics(String varName) throws DMLRuntimeException { - if(_outputs.containsKey(varName)) { - return _outMetadata.get(varName); - } - throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); - } - - /** - * Note, the output DataFrame has an additional column ID. - * An easy way to get DataFrame without ID is by df.drop("__INDEX") - * - * @param sparkSession the Spark Session - * @param varName the variable name - * @return the DataFrame - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public Dataset<Row> getDF(SparkSession sparkSession, String varName) throws DMLRuntimeException { - if(sparkSession == null) { - throw new DMLRuntimeException("SparkSession is not created."); - } - JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName); - if(rdd != null) { - MatrixCharacteristics mc = _outMetadata.get(varName); - return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, rdd, mc, false); - } - throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); - } - - /** - * Note, the output DataFrame has an additional column ID. - * An easy way to get DataFrame without ID is by df.drop("__INDEX") - * - * @param sqlContext the SQL Context - * @param varName the variable name - * @return the DataFrame - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws DMLRuntimeException { - if (sqlContext == null) { - throw new DMLRuntimeException("SQLContext is not created"); - } - SparkSession sparkSession = sqlContext.sparkSession(); - return getDF(sparkSession, varName); - } - - /** - * Obtain the DataFrame - * - * @param sparkSession the Spark Session - * @param varName the variable name - * @param outputVector if true, returns DataFrame with two column: ID and org.apache.spark.ml.linalg.Vector - * @return the DataFrame - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public Dataset<Row> getDF(SparkSession sparkSession, String varName, boolean outputVector) throws DMLRuntimeException { - if(sparkSession == null) { - throw new DMLRuntimeException("SparkSession is not created."); - } - if(outputVector) { - JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName); - if(rdd != null) { - MatrixCharacteristics mc = _outMetadata.get(varName); - return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, rdd, mc, true); - } - throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); - } - else { - return getDF(sparkSession, varName); - } - - } - - /** - * Obtain the DataFrame - * - * @param sqlContext the SQL Context - * @param varName the variable name - * @param outputVector if true, returns DataFrame with two column: ID and org.apache.spark.ml.linalg.Vector - * @return the DataFrame - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public Dataset<Row> getDF(SQLContext sqlContext, String varName, boolean outputVector) throws DMLRuntimeException { - if (sqlContext == null) { - throw new DMLRuntimeException("SQLContext is not created"); - } - SparkSession sparkSession = sqlContext.sparkSession(); - return getDF(sparkSession, varName, outputVector); - } - - /** - * This methods improves the performance of MLPipeline wrappers. - * - * @param sparkSession the Spark Session - * @param varName the variable name - * @param mc the matrix characteristics - * @return the DataFrame - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public Dataset<Row> getDF(SparkSession sparkSession, String varName, MatrixCharacteristics mc) - throws DMLRuntimeException - { - if(sparkSession == null) { - throw new DMLRuntimeException("SparkSession is not created."); - } - JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = getBinaryBlockedRDD(varName); - return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockRDD, mc, true); - } - - /** - * This methods improves the performance of MLPipeline wrappers. - * - * @param sqlContext the SQL Context - * @param varName the variable name - * @param mc the matrix characteristics - * @return the DataFrame - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public Dataset<Row> getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc) - throws DMLRuntimeException - { - if (sqlContext == null) { - throw new DMLRuntimeException("SQLContext is not created"); - } - SparkSession sparkSession = sqlContext.sparkSession(); - return getDF(sparkSession, varName, mc); - } - - public JavaRDD<String> getStringRDD(String varName, String format) throws DMLRuntimeException { - if(format.equals("text")) { - JavaPairRDD<MatrixIndexes, MatrixBlock> binaryRDD = getBinaryBlockedRDD(varName); - MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); - return RDDConverterUtils.binaryBlockToTextCell(binaryRDD, mcIn); - } - else { - throw new DMLRuntimeException("The output format:" + format + " is not implemented yet."); - } - } - - public JavaRDD<String> getStringFrameRDD(String varName, String format, CSVFileFormatProperties fprop ) throws DMLRuntimeException { - JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName); - MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); - if(format.equals("csv")) { - return FrameRDDConverterUtils.binaryBlockToCsv(binaryRDD, mcIn, fprop, false); - } - else if(format.equals("text")) { - return FrameRDDConverterUtils.binaryBlockToTextCell(binaryRDD, mcIn); - } - else { - throw new DMLRuntimeException("The output format:" + format + " is not implemented yet."); - } - - } - - public Dataset<Row> getDataFrameRDD(String varName, JavaSparkContext jsc) throws DMLRuntimeException { - JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName); - MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); - SparkSession sparkSession = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate(); - return FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryRDD, mcIn, null); - } - - public MLMatrix getMLMatrix(MLContext ml, SparkSession sparkSession, String varName) throws DMLRuntimeException { - if(sparkSession == null) { - throw new DMLRuntimeException("SparkSession is not created."); - } - else if(ml == null) { - throw new DMLRuntimeException("MLContext is not created."); - } - JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName); - if(rdd != null) { - MatrixCharacteristics mc = getMatrixCharacteristics(varName); - StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); - return new MLMatrix(sparkSession.createDataFrame(rdd.map(new GetMLBlock()).rdd(), schema), mc, ml); - } - throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); - } - - public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException { - if (sqlContext == null) { - throw new DMLRuntimeException("SQLContext is not created"); - } - SparkSession sparkSession = sqlContext.sparkSession(); - return getMLMatrix(ml, sparkSession, varName); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/api/python/SystemML.py ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/python/SystemML.py b/src/main/java/org/apache/sysml/api/python/SystemML.py deleted file mode 100644 index b22c570..0000000 --- a/src/main/java/org/apache/sysml/api/python/SystemML.py +++ /dev/null @@ -1,232 +0,0 @@ -#!/usr/bin/python -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - -from __future__ import division -from py4j.protocol import Py4JJavaError, Py4JError -import traceback -import os -from pyspark.context import SparkContext -from pyspark.sql import DataFrame, SparkSession -from pyspark.rdd import RDD - - -class MLContext(object): - - """ - Simple wrapper class for MLContext in SystemML.jar - ... - Attributes - ---------- - ml : MLContext - A reference to the java MLContext - sc : SparkContext - The SparkContext that has been specified during initialization - """ - - def __init__(self, sc, *args): - """ - If initialized with a SparkContext, will connect to the Java MLContext - class. - args: - sc (SparkContext): the current SparkContext - monitor (boolean=False): Whether to monitor the performance - forceSpark (boolean=False): Whether to force execution on spark - returns: - MLContext: Instance of MLContext - """ - - try: - monitorPerformance = (args[0] if len(args) > 0 else False) - setForcedSparkExecType = (args[1] if len(args) > 1 else False) - self.sc = sc - self.ml = sc._jvm.org.apache.sysml.api.MLContext(sc._jsc, monitorPerformance, setForcedSparkExecType) - self.sparkSession = SparkSession.builder.getOrCreate() - except Py4JError: - traceback.print_exc() - - def reset(self): - """ - Call this method of you want to clear any RDDs set via - registerInput or registerOutput - """ - try: - self.ml.reset() - except Py4JJavaError: - traceback.print_exc() - - def execute(self, dmlScriptFilePath, *args): - """ - Executes the script in spark-mode by passing the arguments to the - MLContext java class. - Returns: - MLOutput: an instance of the MLOutput-class - """ - numArgs = len(args) + 1 - try: - if numArgs == 1: - jmlOut = self.ml.execute(dmlScriptFilePath) - mlOut = MLOutput(jmlOut, self.sc) - return mlOut - elif numArgs == 2: - jmlOut = self.ml.execute(dmlScriptFilePath, args[0]) - mlOut = MLOutput(jmlOut, self.sc) - return mlOut - elif numArgs == 3: - jmlOut = self.ml.execute(dmlScriptFilePath, args[0], args[1]) - mlOut = MLOutput(jmlOut, self.sc) - return mlOut - elif numArgs == 4: - jmlOut = self.ml.execute(dmlScriptFilePath, args[0], args[1], args[2]) - mlOut = MLOutput(jmlOut, self.sc) - return mlOut - else: - raise TypeError('Arguments do not match MLContext-API') - except Py4JJavaError: - traceback.print_exc() - - def executeScript(self, dmlScript, nargs=None, outputs=None, isPyDML=False, configFilePath=None): - """ - Executes the script in spark-mode by passing the arguments to the - MLContext java class. - Returns: - MLOutput: an instance of the MLOutput-class - """ - try: - # Register inputs as needed - if nargs is not None: - for key, value in list(nargs.items()): - if isinstance(value, DataFrame): - self.registerInput(key, value) - del nargs[key] - else: - nargs[key] = str(value) - else: - nargs = {} - - # Register outputs as needed - if outputs is not None: - for out in outputs: - self.registerOutput(out) - - # Execute script - jml_out = self.ml.executeScript(dmlScript, nargs, isPyDML, configFilePath) - ml_out = MLOutput(jml_out, self.sc) - return ml_out - except Py4JJavaError: - traceback.print_exc() - - def registerInput(self, varName, src, *args): - """ - Method to register inputs used by the DML script. - Supported format: - 1. DataFrame - 2. CSV/Text (as JavaRDD<String> or JavaPairRDD<LongWritable, Text>) - 3. Binary blocked RDD (JavaPairRDD<MatrixIndexes,MatrixBlock>)) - Also overloaded to support metadata information such as format, rlen, clen, ... - Please note the variable names given below in quotes correspond to the variables in DML script. - These variables need to have corresponding read/write associated in DML script. - Currently, only matrix variables are supported through registerInput/registerOutput interface. - To pass scalar variables, use named/positional arguments (described later) or wrap them into matrix variable. - """ - numArgs = len(args) + 2 - - if hasattr(src, '_jdf'): - rdd = src._jdf - elif hasattr(src, '_jrdd'): - rdd = src._jrdd - else: - rdd = src - - try: - if numArgs == 2: - self.ml.registerInput(varName, rdd) - elif numArgs == 3: - self.ml.registerInput(varName, rdd, args[0]) - elif numArgs == 4: - self.ml.registerInput(varName, rdd, args[0], args[1]) - elif numArgs == 5: - self.ml.registerInput(varName, rdd, args[0], args[1], args[2]) - elif numArgs == 6: - self.ml.registerInput(varName, rdd, args[0], args[1], args[2], args[3]) - elif numArgs == 7: - self.ml.registerInput(varName, rdd, args[0], args[1], args[2], args[3], args[4]) - elif numArgs == 10: - self.ml.registerInput(varName, rdd, args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]) - else: - raise TypeError('Arguments do not match MLContext-API') - except Py4JJavaError: - traceback.print_exc() - - def registerOutput(self, varName): - """ - Register output variables used in the DML script - args: - varName: (String) The name used in the DML script - """ - - try: - self.ml.registerOutput(varName) - except Py4JJavaError: - traceback.print_exc() - - def getDmlJson(self): - try: - return self.ml.getMonitoringUtil().getRuntimeInfoInJSONFormat() - except Py4JJavaError: - traceback.print_exc() - - -class MLOutput(object): - - """ - This is a simple wrapper object that returns the output of execute from MLContext - ... - Attributes - ---------- - jmlOut MLContext: - A reference to the MLOutput object through py4j - """ - - def __init__(self, jmlOut, sc): - self.jmlOut = jmlOut - self.sc = sc - - def getBinaryBlockedRDD(self, varName): - raise Exception('Not supported in Python MLContext') - - def getMatrixCharacteristics(self, varName): - raise Exception('Not supported in Python MLContext') - - def getDF(self, sparkSession, varName): - try: - jdf = self.jmlOut.getDF(sparkSession, varName) - df = DataFrame(jdf, sparkSession) - return df - except Py4JJavaError: - traceback.print_exc() - - def getMLMatrix(self, sparkSession, varName): - raise Exception('Not supported in Python MLContext') - - def getStringRDD(self, varName, format): - raise Exception('Not supported in Python MLContext') -
