Repository: incubator-systemml Updated Branches: refs/heads/gh-pages cb6f8456f -> bfb93b03c
[SYSTEMML-1181] Remove old Spark MLContext API documentation Closes #377. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bfb93b03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bfb93b03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bfb93b03 Branch: refs/heads/gh-pages Commit: bfb93b03c71493ac210a2b7594eb2a3d3c15fcb5 Parents: cb6f845 Author: Felix Schueler <felix.schue...@ibm.com> Authored: Fri Feb 3 18:06:10 2017 -0800 Committer: Deron Eriksson <de...@us.ibm.com> Committed: Fri Feb 3 18:06:10 2017 -0800 ---------------------------------------------------------------------- spark-mlcontext-programming-guide.md | 1057 ----------------------------- 1 file changed, 1057 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bfb93b03/spark-mlcontext-programming-guide.md ---------------------------------------------------------------------- diff --git a/spark-mlcontext-programming-guide.md b/spark-mlcontext-programming-guide.md index 45c0091..e5df11f 100644 --- a/spark-mlcontext-programming-guide.md +++ b/spark-mlcontext-programming-guide.md @@ -34,10 +34,6 @@ The Spark `MLContext` API offers a programmatic interface for interacting with S such as Scala, Java, and Python. As a result, it offers a convenient way to interact with SystemML from the Spark Shell and from Notebooks such as Jupyter and Zeppelin. -**NOTE: A new MLContext API has been redesigned for future SystemML releases. The old API is available -in previous versions of SystemML but is deprecated and will be removed soon, so please migrate to the new API.** - - # Spark Shell Example ## Start Spark Shell with SystemML @@ -1822,1059 +1818,6 @@ plt.title('PNMF Training Loss') --- -# Spark Shell Example - OLD API - -### ** **NOTE: This API is old and has been deprecated.** ** -**Please use the [new MLContext API](spark-mlcontext-programming-guide#spark-shell-example) instead.** - -## Start Spark Shell with SystemML - -To use SystemML with the Spark Shell, the SystemML jar can be referenced using the Spark Shell's `--jars` option. -Instructions to build the SystemML jar can be found in the [SystemML GitHub README](https://github.com/apache/incubator-systemml). - -{% highlight bash %} -./bin/spark-shell --executor-memory 4G --driver-memory 4G --jars SystemML.jar -{% endhighlight %} - -Here is an example of Spark Shell with SystemML and YARN. - -{% highlight bash %} -./bin/spark-shell --master yarn-client --num-executors 3 --driver-memory 5G --executor-memory 5G --executor-cores 4 --jars SystemML.jar -{% endhighlight %} - - -## Create MLContext - -An `MLContext` object can be created by passing its constructor a reference to the `SparkContext`. - -<div class="codetabs"> - -<div data-lang="Spark Shell" markdown="1"> -{% highlight scala %} -scala>import org.apache.sysml.api.MLContext -import org.apache.sysml.api.MLContext - -scala> val ml = new MLContext(sc) -ml: org.apache.sysml.api.MLContext = org.apache.sysml.api.MLContext@33e38c6b -{% endhighlight %} -</div> - -<div data-lang="Statements" markdown="1"> -{% highlight scala %} -import org.apache.sysml.api.MLContext -val ml = new MLContext(sc) -{% endhighlight %} -</div> - -</div> - - -## Create DataFrame - -For demonstration purposes, we'll create a `DataFrame` consisting of 100,000 rows and 1,000 columns -of random `double`s. - -<div class="codetabs"> - -<div data-lang="Spark Shell" markdown="1"> -{% highlight scala %} -scala> import org.apache.spark.sql._ -import org.apache.spark.sql._ - -scala> import org.apache.spark.sql.types.{StructType,StructField,DoubleType} -import org.apache.spark.sql.types.{StructType, StructField, DoubleType} - -scala> import scala.util.Random -import scala.util.Random - -scala> val numRows = 100000 -numRows: Int = 100000 - -scala> val numCols = 1000 -numCols: Int = 1000 - -scala> val data = sc.parallelize(0 to numRows-1).map { _ => Row.fromSeq(Seq.fill(numCols)(Random.nextDouble)) } -data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] at map at <console>:33 - -scala> val schema = StructType((0 to numCols-1).map { i => StructField("C" + i, DoubleType, true) } ) -schema: org.apache.spark.sql.types.StructType = StructType(StructField(C0,DoubleType,true), StructField(C1,DoubleType,true), StructField(C2,DoubleType,true), StructField(C3,DoubleType,true), StructField(C4,DoubleType,true), StructField(C5,DoubleType,true), StructField(C6,DoubleType,true), StructField(C7,DoubleType,true), StructField(C8,DoubleType,true), StructField(C9,DoubleType,true), StructField(C10,DoubleType,true), StructField(C11,DoubleType,true), StructField(C12,DoubleType,true), StructField(C13,DoubleType,true), StructField(C14,DoubleType,true), StructField(C15,DoubleType,true), StructField(C16,DoubleType,true), StructField(C17,DoubleType,true), StructField(C18,DoubleType,true), StructField(C19,DoubleType,true), StructField(C20,DoubleType,true), StructField(C21,DoubleType,true), ... - -scala> val df = spark.createDataFrame(data, schema) -df: org.apache.spark.sql.DataFrame = [C0: double, C1: double, C2: double, C3: double, C4: double, C5: double, C6: double, C7: double, C8: double, C9: double, C10: double, C11: double, C12: double, C13: double, C14: double, C15: double, C16: double, C17: double, C18: double, C19: double, C20: double, C21: double, C22: double, C23: double, C24: double, C25: double, C26: double, C27: double, C28: double, C29: double, C30: double, C31: double, C32: double, C33: double, C34: double, C35: double, C36: double, C37: double, C38: double, C39: double, C40: double, C41: double, C42: double, C43: double, C44: double, C45: double, C46: double, C47: double, C48: double, C49: double, C50: double, C51: double, C52: double, C53: double, C54: double, C55: double, C56: double, C57: double, C58: double, C5... - -{% endhighlight %} -</div> - -<div data-lang="Statements" markdown="1"> -{% highlight scala %} -import org.apache.spark.sql._ -import org.apache.spark.sql.types.{StructType,StructField,DoubleType} -import scala.util.Random -val numRows = 100000 -val numCols = 1000 -val data = sc.parallelize(0 to numRows-1).map { _ => Row.fromSeq(Seq.fill(numCols)(Random.nextDouble)) } -val schema = StructType((0 to numCols-1).map { i => StructField("C" + i, DoubleType, true) } ) -val df = spark.createDataFrame(data, schema) -{% endhighlight %} -</div> - -</div> - - -## Helper Methods - -For convenience, we'll create some helper methods. The SystemML output data is encapsulated in -an `MLOutput` object. The `getScalar()` method extracts a scalar value from a `DataFrame` returned by -`MLOutput`. The `getScalarDouble()` method returns such a value as a `Double`, and the -`getScalarInt()` method returns such a value as an `Int`. - -<div class="codetabs"> - -<div data-lang="Spark Shell" markdown="1"> -{% highlight scala %} -scala> import org.apache.sysml.api.MLOutput -import org.apache.sysml.api.MLOutput - -scala> def getScalar(outputs: MLOutput, symbol: String): Any = - | outputs.getDF(spark.sqlContext, symbol).first()(1) -getScalar: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Any - -scala> def getScalarDouble(outputs: MLOutput, symbol: String): Double = - | getScalar(outputs, symbol).asInstanceOf[Double] -getScalarDouble: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Double - -scala> def getScalarInt(outputs: MLOutput, symbol: String): Int = - | getScalarDouble(outputs, symbol).toInt -getScalarInt: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Int - -{% endhighlight %} -</div> - -<div data-lang="Statements" markdown="1"> -{% highlight scala %} -import org.apache.sysml.api.MLOutput -def getScalar(outputs: MLOutput, symbol: String): Any = -outputs.getDF(spark.sqlContext, symbol).first()(1) -def getScalarDouble(outputs: MLOutput, symbol: String): Double = -getScalar(outputs, symbol).asInstanceOf[Double] -def getScalarInt(outputs: MLOutput, symbol: String): Int = -getScalarDouble(outputs, symbol).toInt - -{% endhighlight %} -</div> - -</div> - - -## Convert DataFrame to Binary-Block Matrix - -SystemML is optimized to operate on a binary-block format for matrix representation. For large -datasets, conversion from DataFrame to binary-block can require a significant quantity of time. -Explicit DataFrame to binary-block conversion allows algorithm performance to be measured separately -from data conversion time. - -The SystemML binary-block matrix representation can be thought of as a two-dimensional array of blocks, where each block -consists of a number of rows and columns. In this example, we specify a matrix consisting -of blocks of size 1000x1000. The experimental `dataFrameToBinaryBlock()` method of `RDDConverterUtilsExt` is used -to convert the `DataFrame df` to a SystemML binary-block matrix, which is represented by the datatype -`JavaPairRDD[MatrixIndexes, MatrixBlock]`. - -<div class="codetabs"> - -<div data-lang="Spark Shell" markdown="1"> -{% highlight scala %} -scala> import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt => RDDConverterUtils} -import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt=>RDDConverterUtils} - -scala> import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics - -scala> val numRowsPerBlock = 1000 -numRowsPerBlock: Int = 1000 - -scala> val numColsPerBlock = 1000 -numColsPerBlock: Int = 1000 - -scala> val mc = new MatrixCharacteristics(numRows, numCols, numRowsPerBlock, numColsPerBlock) -mc: org.apache.sysml.runtime.matrix.MatrixCharacteristics = [100000 x 1000, nnz=-1, blocks (1000 x 1000)] - -scala> val sysMlMatrix = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc, false) -sysMlMatrix: org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock] = org.apache.spark.api.java.JavaPairRDD@2bce3248 - -{% endhighlight %} -</div> - -<div data-lang="Statements" markdown="1"> -{% highlight scala %} -import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt => RDDConverterUtils} -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -val numRowsPerBlock = 1000 -val numColsPerBlock = 1000 -val mc = new MatrixCharacteristics(numRows, numCols, numRowsPerBlock, numColsPerBlock) -val sysMlMatrix = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc, false) - -{% endhighlight %} -</div> - -</div> - - -## DML Script - -For this example, we will utilize the following DML Script called `shape.dml` that reads in a matrix and outputs the number of rows and the -number of columns, each represented as a matrix. - -{% highlight r %} -X = read($Xin) -m = matrix(nrow(X), rows=1, cols=1) -n = matrix(ncol(X), rows=1, cols=1) -write(m, $Mout) -write(n, $Nout) -{% endhighlight %} - - -## Execute Script - -Let's execute our DML script, as shown in the example below. The call to `reset()` of `MLContext` is not necessary here, but this method should -be called if you need to reset inputs and outputs or if you would like to call `execute()` with a different script. - -An example of registering the `DataFrame df` as an input to the `X` variable is shown but commented out. If a DataFrame is registered directly, -it will implicitly be converted to SystemML's binary-block format. However, since we've already explicitly converted the DataFrame to the -binary-block fixed variable `systemMlMatrix`, we will register this input to the `X` variable. We register the `m` and `n` variables -as outputs. - -When SystemML is executed via `DMLScript` (such as in Standalone Mode), inputs are supplied as either command-line named arguments -or positional argument. These inputs are specified in DML scripts by prepending them with a `$`. Values are read from or written -to files using `read`/`write` (DML) and `load`/`save` (PyDML) statements. When utilizing the `MLContext` API, -inputs and outputs can be other data representations, such as `DataFrame`s. The input and output data are bound to DML variables. -The named arguments in the `shape.dml` script do not have default values set for them, so we create a `Map` to map the required named -arguments to blank `String`s so that the script can pass validation. - -The `shape.dml` script is executed by the call to `execute()`, where we supply the `Map` of required named arguments. The -execution results are returned as the `MLOutput` fixed variable `outputs`. The number of rows is obtained by calling the `getStaticInt()` -helper method with the `outputs` object and `"m"`. The number of columns is retrieved by calling `getStaticInt()` with -`outputs` and `"n"`. - -<div class="codetabs"> - -<div data-lang="Spark Shell" markdown="1"> -{% highlight scala %} -scala> ml.reset() - -scala> //ml.registerInput("X", df) // implicit conversion of DataFrame to binary-block - -scala> ml.registerInput("X", sysMlMatrix, numRows, numCols) - -scala> ml.registerOutput("m") - -scala> ml.registerOutput("n") - -scala> val nargs = Map("Xin" -> " ", "Mout" -> " ", "Nout" -> " ") -nargs: scala.collection.immutable.Map[String,String] = Map(Xin -> " ", Mout -> " ", Nout -> " ") - -scala> val outputs = ml.execute("shape.dml", nargs) -15/10/12 16:29:15 WARN : Your hostname, derons-mbp.usca.ibm.com resolves to a loopback/non-reachable address: 127.0.0.1, but we couldn't find any external IP address! -15/10/12 16:29:15 WARN OptimizerUtils: Auto-disable multi-threaded text read for 'text' and 'csv' due to thread contention on JRE < 1.8 (java.version=1.7.0_80). -outputs: org.apache.sysml.api.MLOutput = org.apache.sysml.api.MLOutput@4d424743 - -scala> val m = getScalarInt(outputs, "m") -m: Int = 100000 - -scala> val n = getScalarInt(outputs, "n") -n: Int = 1000 - -{% endhighlight %} -</div> - -<div data-lang="Statements" markdown="1"> -{% highlight scala %} -ml.reset() -//ml.registerInput("X", df) // implicit conversion of DataFrame to binary-block -ml.registerInput("X", sysMlMatrix, numRows, numCols) -ml.registerOutput("m") -ml.registerOutput("n") -val nargs = Map("Xin" -> " ", "Mout" -> " ", "Nout" -> " ") -val outputs = ml.execute("shape.dml", nargs) -val m = getScalarInt(outputs, "m") -val n = getScalarInt(outputs, "n") - -{% endhighlight %} -</div> - -</div> - - -## DML Script as String - -The `MLContext` API allows a DML script to be specified -as a `String`. Here, we specify a DML script as a fixed `String` variable called `minMaxMeanScript`. -This DML will find the minimum, maximum, and mean value of a matrix. - -<div class="codetabs"> - -<div data-lang="Spark Shell" markdown="1"> -{% highlight scala %} -scala> val minMaxMeanScript: String = - | """ - | Xin = read(" ") - | minOut = matrix(min(Xin), rows=1, cols=1) - | maxOut = matrix(max(Xin), rows=1, cols=1) - | meanOut = matrix(mean(Xin), rows=1, cols=1) - | write(minOut, " ") - | write(maxOut, " ") - | write(meanOut, " ") - | """ -minMaxMeanScript: String = -" -Xin = read(" ") -minOut = matrix(min(Xin), rows=1, cols=1) -maxOut = matrix(max(Xin), rows=1, cols=1) -meanOut = matrix(mean(Xin), rows=1, cols=1) -write(minOut, " ") -write(maxOut, " ") -write(meanOut, " ") -" - -{% endhighlight %} -</div> - -<div data-lang="Statements" markdown="1"> -{% highlight scala %} -val minMaxMeanScript: String = -""" -Xin = read(" ") -minOut = matrix(min(Xin), rows=1, cols=1) -maxOut = matrix(max(Xin), rows=1, cols=1) -meanOut = matrix(mean(Xin), rows=1, cols=1) -write(minOut, " ") -write(maxOut, " ") -write(meanOut, " ") -""" - -{% endhighlight %} -</div> - -</div> - -## Scala Wrapper for DML - -We can create a Scala wrapper for our invocation of the `minMaxMeanScript` DML `String`. The `minMaxMean()` method -takes a `JavaPairRDD[MatrixIndexes, MatrixBlock]` parameter, which is a SystemML binary-block matrix representation. -It also takes a `rows` parameter indicating the number of rows in the matrix, a `cols` parameter indicating the number -of columns in the matrix, and an `MLContext` parameter. The `minMaxMean()` method -returns a tuple consisting of the minimum value in the matrix, the maximum value in the matrix, and the computed -mean value of the matrix. - -<div class="codetabs"> - -<div data-lang="Spark Shell" markdown="1"> -{% highlight scala %} -scala> import org.apache.sysml.runtime.matrix.data.MatrixIndexes -import org.apache.sysml.runtime.matrix.data.MatrixIndexes - -scala> import org.apache.sysml.runtime.matrix.data.MatrixBlock -import org.apache.sysml.runtime.matrix.data.MatrixBlock - -scala> import org.apache.spark.api.java.JavaPairRDD -import org.apache.spark.api.java.JavaPairRDD - -scala> def minMaxMean(mat: JavaPairRDD[MatrixIndexes, MatrixBlock], rows: Int, cols: Int, ml: MLContext): (Double, Double, Double) = { - | ml.reset() - | ml.registerInput("Xin", mat, rows, cols) - | ml.registerOutput("minOut") - | ml.registerOutput("maxOut") - | ml.registerOutput("meanOut") - | val outputs = ml.executeScript(minMaxMeanScript) - | val minOut = getScalarDouble(outputs, "minOut") - | val maxOut = getScalarDouble(outputs, "maxOut") - | val meanOut = getScalarDouble(outputs, "meanOut") - | (minOut, maxOut, meanOut) - | } -minMaxMean: (mat: org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock], rows: Int, cols: Int, ml: org.apache.sysml.api.MLContext)(Double, Double, Double) - -{% endhighlight %} -</div> - -<div data-lang="Statements" markdown="1"> -{% highlight scala %} -import org.apache.sysml.runtime.matrix.data.MatrixIndexes -import org.apache.sysml.runtime.matrix.data.MatrixBlock -import org.apache.spark.api.java.JavaPairRDD -def minMaxMean(mat: JavaPairRDD[MatrixIndexes, MatrixBlock], rows: Int, cols: Int, ml: MLContext): (Double, Double, Double) = { -ml.reset() -ml.registerInput("Xin", mat, rows, cols) -ml.registerOutput("minOut") -ml.registerOutput("maxOut") -ml.registerOutput("meanOut") -val outputs = ml.executeScript(minMaxMeanScript) -val minOut = getScalarDouble(outputs, "minOut") -val maxOut = getScalarDouble(outputs, "maxOut") -val meanOut = getScalarDouble(outputs, "meanOut") -(minOut, maxOut, meanOut) -} - -{% endhighlight %} -</div> - -</div> - - -## Invoking DML via Scala Wrapper - -Here, we invoke `minMaxMeanScript` using our `minMaxMean()` Scala wrapper method. It returns a tuple -consisting of the minimum value in the matrix, the maximum value in the matrix, and the mean value of the matrix. - -<div class="codetabs"> - -<div data-lang="Spark Shell" markdown="1"> -{% highlight scala %} -scala> val (min, max, mean) = minMaxMean(sysMlMatrix, numRows, numCols, ml) -15/10/13 14:33:11 WARN OptimizerUtils: Auto-disable multi-threaded text read for 'text' and 'csv' due to thread contention on JRE < 1.8 (java.version=1.7.0_80). -min: Double = 5.378949397005783E-9 -max: Double = 0.9999999934660398 -mean: Double = 0.499988222338507 - -{% endhighlight %} -</div> - -<div data-lang="Statements" markdown="1"> -{% highlight scala %} -val (min, max, mean) = minMaxMean(sysMlMatrix, numRows, numCols, ml) - -{% endhighlight %} -</div> - -</div> - ---- - -# Zeppelin Notebook Example - Linear Regression Algorithm - OLD API - -### ** **NOTE: This API is old and has been deprecated.** ** -**Please use the [new MLContext API](spark-mlcontext-programming-guide#spark-shell-example) instead.** - -Next, we'll consider an example of a SystemML linear regression algorithm run from Spark through an Apache Zeppelin notebook. -Instructions to clone and build Zeppelin can be found at the [GitHub Apache Zeppelin](https://github.com/apache/incubator-zeppelin) -site. This example also will look at the Spark ML linear regression algorithm. - -This Zeppelin notebook example can be imported by choosing `Import note` -> `Add from URL` from the Zeppelin main page, then insert the following URL: - - https://raw.githubusercontent.com/apache/incubator-systemml/master/samples/zeppelin-notebooks/2AZ2AQ12B/note.json - -Alternatively download <a href="https://raw.githubusercontent.com/apache/incubator-systemml/master/samples/zeppelin-notebooks/2AZ2AQ12B/note.json" download="note.json">note.json</a>, then import it by choosing `Import note` -> `Choose a JSON here` from the Zeppelin main page. - -A `conf/zeppelin-env.sh` file is created based on `conf/zeppelin-env.sh.template`. For -this demonstration, it features `SPARK_HOME`, `SPARK_SUBMIT_OPTIONS`, and `ZEPPELIN_SPARK_USEHIVECONTEXT` -environment variables: - - export SPARK_HOME=/Users/example/spark-1.5.1-bin-hadoop2.6 - export SPARK_SUBMIT_OPTIONS="--jars /Users/example/systemml/system-ml/target/SystemML.jar" - export ZEPPELIN_SPARK_USEHIVECONTEXT=false - -Start Zeppelin using the `zeppelin.sh` script: - - bin/zeppelin.sh - -After opening Zeppelin in a brower, we see the "SystemML - Linear Regression" note in the list of available -Zeppelin notes. - -![Zeppelin Notebook](img/spark-mlcontext-programming-guide/zeppelin-notebook.png "Zeppelin Notebook") - -If we go to the "SystemML - Linear Regression" note, we see that the note consists of several cells of code. - -![Zeppelin 'SystemML - Linear Regression' Note](img/spark-mlcontext-programming-guide/zeppelin-notebook-systemml-linear-regression.png "Zeppelin 'SystemML - Linear Regression' Note") - -Let's briefly consider these cells. - -## Trigger Spark Startup - -This cell triggers Spark to initialize by calling the `SparkContext` `sc` object. Information regarding these startup operations can be viewed in the -console window in which `zeppelin.sh` is running. - -**Cell:** -{% highlight scala %} -// Trigger Spark Startup -sc -{% endhighlight %} - -**Output:** -{% highlight scala %} -res8: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6ce70bf3 -{% endhighlight %} - - -## Generate Linear Regression Test Data - -The Spark `LinearDataGenerator` is used to generate test data for the Spark ML and SystemML linear regression algorithms. - -**Cell:** -{% highlight scala %} -// Generate data -import org.apache.spark.mllib.util.LinearDataGenerator -import spark.implicits._ - -val numRows = 10000 -val numCols = 1000 -val rawData = LinearDataGenerator.generateLinearRDD(sc, numRows, numCols, 1).toDF() - -// Repartition into a more parallelism-friendly number of partitions -val data = rawData.repartition(64).cache() -{% endhighlight %} - -**Output:** -{% highlight scala %} -import org.apache.spark.mllib.util.LinearDataGenerator -numRows: Int = 10000 -numCols: Int = 1000 -rawData: org.apache.spark.sql.DataFrame = [label: double, features: vector] -data: org.apache.spark.sql.DataFrame = [label: double, features: vector] -{% endhighlight %} - - -## Train using Spark ML Linear Regression Algorithm for Comparison - -For purpose of comparison, we can train a model using the Spark ML linear regression -algorithm. - -**Cell:** -{% highlight scala %} -// Spark ML -import org.apache.spark.ml.regression.LinearRegression - -// Model Settings -val maxIters = 100 -val reg = 0 -val elasticNetParam = 0 // L2 reg - -// Fit the model -val lr = new LinearRegression() - .setMaxIter(maxIters) - .setRegParam(reg) - .setElasticNetParam(elasticNetParam) -val start = System.currentTimeMillis() -val model = lr.fit(data) -val trainingTime = (System.currentTimeMillis() - start).toDouble / 1000.0 - -// Summarize the model over the training set and gather some metrics -val trainingSummary = model.summary -val r2 = trainingSummary.r2 -val iters = trainingSummary.totalIterations -val trainingTimePerIter = trainingTime / iters -{% endhighlight %} - -**Output:** -{% highlight scala %} -import org.apache.spark.ml.regression.LinearRegression -maxIters: Int = 100 -reg: Int = 0 -elasticNetParam: Int = 0 -lr: org.apache.spark.ml.regression.LinearRegression = linReg_a7f51d676562 -start: Long = 1444672044647 -model: org.apache.spark.ml.regression.LinearRegressionModel = linReg_a7f51d676562 -trainingTime: Double = 12.985 -trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@227ba28b -r2: Double = 0.9677118209276552 -iters: Int = 17 -trainingTimePerIter: Double = 0.7638235294117647 -{% endhighlight %} - - -## Spark ML Linear Regression Summary Statistics - -Summary statistics for the Spark ML linear regression algorithm are displayed by this cell. - -**Cell:** -{% highlight scala %} -// Print statistics -println(s"R2: ${r2}") -println(s"Iterations: ${iters}") -println(s"Training time per iter: ${trainingTimePerIter} seconds") -{% endhighlight %} - -**Output:** -{% highlight scala %} -R2: 0.9677118209276552 -Iterations: 17 -Training time per iter: 0.7638235294117647 seconds -{% endhighlight %} - - -## SystemML Linear Regression Algorithm - -The `linearReg` fixed `String` variable is set to -a linear regression algorithm written in DML, SystemML's Declarative Machine Learning language. - - - -**Cell:** -{% highlight scala %} -// SystemML kernels -val linearReg = -""" -# -# THIS SCRIPT SOLVES LINEAR REGRESSION USING THE CONJUGATE GRADIENT ALGORITHM -# -# INPUT PARAMETERS: -# -------------------------------------------------------------------------------------------- -# NAME TYPE DEFAULT MEANING -# -------------------------------------------------------------------------------------------- -# X String --- Matrix X of feature vectors -# Y String --- 1-column Matrix Y of response values -# icpt Int 0 Intercept presence, shifting and rescaling the columns of X: -# 0 = no intercept, no shifting, no rescaling; -# 1 = add intercept, but neither shift nor rescale X; -# 2 = add intercept, shift & rescale X columns to mean = 0, variance = 1 -# reg Double 0.000001 Regularization constant (lambda) for L2-regularization; set to nonzero -# for highly dependend/sparse/numerous features -# tol Double 0.000001 Tolerance (epsilon); conjugate graduent procedure terminates early if -# L2 norm of the beta-residual is less than tolerance * its initial norm -# maxi Int 0 Maximum number of conjugate gradient iterations, 0 = no maximum -# -------------------------------------------------------------------------------------------- -# -# OUTPUT: -# B Estimated regression parameters (the betas) to store -# -# Note: Matrix of regression parameters (the betas) and its size depend on icpt input value: -# OUTPUT SIZE: OUTPUT CONTENTS: HOW TO PREDICT Y FROM X AND B: -# icpt=0: ncol(X) x 1 Betas for X only Y ~ X %*% B[1:ncol(X), 1], or just X %*% B -# icpt=1: ncol(X)+1 x 1 Betas for X and intercept Y ~ X %*% B[1:ncol(X), 1] + B[ncol(X)+1, 1] -# icpt=2: ncol(X)+1 x 2 Col.1: betas for X & intercept Y ~ X %*% B[1:ncol(X), 1] + B[ncol(X)+1, 1] -# Col.2: betas for shifted/rescaled X and intercept -# - -fileX = ""; -fileY = ""; -fileB = ""; - -intercept_status = ifdef ($icpt, 0); # $icpt=0; -tolerance = ifdef ($tol, 0.000001); # $tol=0.000001; -max_iteration = ifdef ($maxi, 0); # $maxi=0; -regularization = ifdef ($reg, 0.000001); # $reg=0.000001; - -X = read (fileX); -y = read (fileY); - -n = nrow (X); -m = ncol (X); -ones_n = matrix (1, rows = n, cols = 1); -zero_cell = matrix (0, rows = 1, cols = 1); - -# Introduce the intercept, shift and rescale the columns of X if needed - -m_ext = m; -if (intercept_status == 1 | intercept_status == 2) # add the intercept column -{ - X = append (X, ones_n); - m_ext = ncol (X); -} - -scale_lambda = matrix (1, rows = m_ext, cols = 1); -if (intercept_status == 1 | intercept_status == 2) -{ - scale_lambda [m_ext, 1] = 0; -} - -if (intercept_status == 2) # scale-&-shift X columns to mean 0, variance 1 -{ # Important assumption: X [, m_ext] = ones_n - avg_X_cols = t(colSums(X)) / n; - var_X_cols = (t(colSums (X ^ 2)) - n * (avg_X_cols ^ 2)) / (n - 1); - is_unsafe = ppred (var_X_cols, 0.0, "<="); - scale_X = 1.0 / sqrt (var_X_cols * (1 - is_unsafe) + is_unsafe); - scale_X [m_ext, 1] = 1; - shift_X = - avg_X_cols * scale_X; - shift_X [m_ext, 1] = 0; -} else { - scale_X = matrix (1, rows = m_ext, cols = 1); - shift_X = matrix (0, rows = m_ext, cols = 1); -} - -# Henceforth, if intercept_status == 2, we use "X %*% (SHIFT/SCALE TRANSFORM)" -# instead of "X". However, in order to preserve the sparsity of X, -# we apply the transform associatively to some other part of the expression -# in which it occurs. To avoid materializing a large matrix, we rewrite it: -# -# ssX_A = (SHIFT/SCALE TRANSFORM) %*% A --- is rewritten as: -# ssX_A = diag (scale_X) %*% A; -# ssX_A [m_ext, ] = ssX_A [m_ext, ] + t(shift_X) %*% A; -# -# tssX_A = t(SHIFT/SCALE TRANSFORM) %*% A --- is rewritten as: -# tssX_A = diag (scale_X) %*% A + shift_X %*% A [m_ext, ]; - -lambda = scale_lambda * regularization; -beta_unscaled = matrix (0, rows = m_ext, cols = 1); - -if (max_iteration == 0) { - max_iteration = m_ext; -} -i = 0; - -# BEGIN THE CONJUGATE GRADIENT ALGORITHM -r = - t(X) %*% y; - -if (intercept_status == 2) { - r = scale_X * r + shift_X %*% r [m_ext, ]; -} - -p = - r; -norm_r2 = sum (r ^ 2); -norm_r2_initial = norm_r2; -norm_r2_target = norm_r2_initial * tolerance ^ 2; - -while (i < max_iteration & norm_r2 > norm_r2_target) -{ - if (intercept_status == 2) { - ssX_p = scale_X * p; - ssX_p [m_ext, ] = ssX_p [m_ext, ] + t(shift_X) %*% p; - } else { - ssX_p = p; - } - - q = t(X) %*% (X %*% ssX_p); - - if (intercept_status == 2) { - q = scale_X * q + shift_X %*% q [m_ext, ]; - } - - q = q + lambda * p; - a = norm_r2 / sum (p * q); - beta_unscaled = beta_unscaled + a * p; - r = r + a * q; - old_norm_r2 = norm_r2; - norm_r2 = sum (r ^ 2); - p = -r + (norm_r2 / old_norm_r2) * p; - i = i + 1; -} -# END THE CONJUGATE GRADIENT ALGORITHM - -if (intercept_status == 2) { - beta = scale_X * beta_unscaled; - beta [m_ext, ] = beta [m_ext, ] + t(shift_X) %*% beta_unscaled; -} else { - beta = beta_unscaled; -} - -# Output statistics -avg_tot = sum (y) / n; -ss_tot = sum (y ^ 2); -ss_avg_tot = ss_tot - n * avg_tot ^ 2; -var_tot = ss_avg_tot / (n - 1); -y_residual = y - X %*% beta; -avg_res = sum (y_residual) / n; -ss_res = sum (y_residual ^ 2); -ss_avg_res = ss_res - n * avg_res ^ 2; - -R2_temp = 1 - ss_res / ss_avg_tot -R2 = matrix(R2_temp, rows=1, cols=1) -write(R2, "") - -totalIters = matrix(i, rows=1, cols=1) -write(totalIters, "") - -# Prepare the output matrix -if (intercept_status == 2) { - beta_out = append (beta, beta_unscaled); -} else { - beta_out = beta; -} - -write (beta_out, fileB); -""" -{% endhighlight %} - -**Output:** - -None - - -## Helper Methods - -This cell contains helper methods to return `Double` and `Int` values from output generated by the `MLContext` API. - -**Cell:** -{% highlight scala %} -// Helper functions -import org.apache.sysml.api.MLOutput - -def getScalar(outputs: MLOutput, symbol: String): Any = - outputs.getDF(spark.sqlContext, symbol).first()(1) - -def getScalarDouble(outputs: MLOutput, symbol: String): Double = - getScalar(outputs, symbol).asInstanceOf[Double] - -def getScalarInt(outputs: MLOutput, symbol: String): Int = - getScalarDouble(outputs, symbol).toInt -{% endhighlight %} - -**Output:** -{% highlight scala %} -import org.apache.sysml.api.MLOutput -getScalar: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Any -getScalarDouble: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Double -getScalarInt: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Int -{% endhighlight %} - - -## Convert DataFrame to Binary-Block Format - -SystemML uses a binary-block format for matrix data representation. This cell -explicitly converts the `DataFrame` `data` object to a binary-block `features` matrix -and single-column `label` matrix, both represented by the -`JavaPairRDD[MatrixIndexes, MatrixBlock]` datatype. - - -**Cell:** -{% highlight scala %} -// Imports -import org.apache.sysml.api.MLContext -import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt => RDDConverterUtils} -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; - -// Create SystemML context -val ml = new MLContext(sc) - -// Convert data to proper format -val mcX = new MatrixCharacteristics(numRows, numCols, 1000, 1000) -val mcY = new MatrixCharacteristics(numRows, 1, 1000, 1000) -val X = RDDConverterUtils.vectorDataFrameToBinaryBlock(sc, data, mcX, false, "features") -val y = RDDConverterUtils.dataFrameToBinaryBlock(sc, data.select("label"), mcY, false) -// val y = data.select("label") - -// Cache -val X2 = X.cache() -val y2 = y.cache() -val cnt1 = X2.count() -val cnt2 = y2.count() -{% endhighlight %} - -**Output:** -{% highlight scala %} -import org.apache.sysml.api.MLContext -import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt=>RDDConverterUtils} -import org.apache.sysml.runtime.matrix.MatrixCharacteristics -ml: org.apache.sysml.api.MLContext = org.apache.sysml.api.MLContext@38d59245 -mcX: org.apache.sysml.runtime.matrix.MatrixCharacteristics = [10000 x 1000, nnz=-1, blocks (1000 x 1000)] -mcY: org.apache.sysml.runtime.matrix.MatrixCharacteristics = [10000 x 1, nnz=-1, blocks (1000 x 1000)] -X: org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock] = org.apache.spark.api.java.JavaPairRDD@b5a86e3 -y: org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock] = org.apache.spark.api.java.JavaPairRDD@56377665 -X2: org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock] = org.apache.spark.api.java.JavaPairRDD@650f29d2 -y2: org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock] = org.apache.spark.api.java.JavaPairRDD@334857a8 -cnt1: Long = 10 -cnt2: Long = 10 -{% endhighlight %} - - -## Train using SystemML Linear Regression Algorithm - -Now, we can train our model using the SystemML linear regression algorithm. We register the features matrix `X` and the label matrix `y` as inputs. We register the `beta_out` matrix, -`R2`, and `totalIters` as outputs. - -**Cell:** -{% highlight scala %} -// Register inputs & outputs -ml.reset() -ml.registerInput("X", X, numRows, numCols) -ml.registerInput("y", y, numRows, 1) -// ml.registerInput("y", y) -ml.registerOutput("beta_out") -ml.registerOutput("R2") -ml.registerOutput("totalIters") - -// Run the script -val start = System.currentTimeMillis() -val outputs = ml.executeScript(linearReg) -val trainingTime = (System.currentTimeMillis() - start).toDouble / 1000.0 - -// Get outputs -val B = outputs.getDF(spark.sqlContext, "beta_out").sort("ID").drop("ID") -val r2 = getScalarDouble(outputs, "R2") -val iters = getScalarInt(outputs, "totalIters") -val trainingTimePerIter = trainingTime / iters -{% endhighlight %} - -**Output:** -{% highlight scala %} -start: Long = 1444672090620 -outputs: org.apache.sysml.api.MLOutput = org.apache.sysml.api.MLOutput@5d2c22d0 -trainingTime: Double = 1.176 -B: org.apache.spark.sql.DataFrame = [C1: double] -r2: Double = 0.9677079547216473 -iters: Int = 12 -trainingTimePerIter: Double = 0.09799999999999999 -{% endhighlight %} - - -## SystemML Linear Regression Summary Statistics - -SystemML linear regression summary statistics are displayed by this cell. - -**Cell:** -{% highlight scala %} -// Print statistics -println(s"R2: ${r2}") -println(s"Iterations: ${iters}") -println(s"Training time per iter: ${trainingTimePerIter} seconds") -B.describe().show() -{% endhighlight %} - -**Output:** -{% highlight scala %} -R2: 0.9677079547216473 -Iterations: 12 -Training time per iter: 0.2334166666666667 seconds -+-------+-------------------+ -|summary| C1| -+-------+-------------------+ -| count| 1000| -| mean| 0.0184500840658385| -| stddev| 0.2764750319432085| -| min|-0.5426068958986378| -| max| 0.5225309861616542| -+-------+-------------------+ -{% endhighlight %} - - ---- - -# Jupyter (PySpark) Notebook Example - Poisson Nonnegative Matrix Factorization - OLD API - -### ** **NOTE: This API is old and has been deprecated.** ** -**Please use the [new MLContext API](spark-mlcontext-programming-guide#jupyter-pyspark-notebook-example---poisson-nonnegative-matrix-factorization) instead.** - -Here, we'll explore the use of SystemML via PySpark in a [Jupyter notebook](http://jupyter.org/). -This Jupyter notebook example can be nicely viewed in a rendered state -[on GitHub](https://github.com/apache/incubator-systemml/blob/master/samples/jupyter-notebooks/SystemML-PySpark-Recommendation-Demo.ipynb), -and can be [downloaded here](https://raw.githubusercontent.com/apache/incubator-systemml/master/samples/jupyter-notebooks/SystemML-PySpark-Recommendation-Demo.ipynb) to a directory of your choice. - -From the directory with the downloaded notebook, start Jupyter with PySpark: - - PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" $SPARK_HOME/bin/pyspark --master local[*] --driver-class-path $SYSTEMML_HOME/SystemML.jar - -This will open Jupyter in a browser: - -![Jupyter Notebook](img/spark-mlcontext-programming-guide/jupyter1.png "Jupyter Notebook") - -We can then open up the `SystemML-PySpark-Recommendation-Demo` notebook: - -![Jupyter Notebook](img/spark-mlcontext-programming-guide/jupyter2.png "Jupyter Notebook") - -## Set up the notebook and download the data - -{% highlight python %} -%load_ext autoreload -%autoreload 2 -%matplotlib inline - -# Add SystemML PySpark API file. -sc.addPyFile("https://raw.githubusercontent.com/apache/incubator-systemml/3d5f9b11741f6d6ecc6af7cbaa1069cde32be838/src/main/java/org/apache/sysml/api/python/SystemML.py") - -import numpy as np -import matplotlib.pyplot as plt -plt.rcParams['figure.figsize'] = (10, 6) -{% endhighlight %} - -{% highlight python %} -%%sh -# Download dataset -curl -O http://snap.stanford.edu/data/amazon0601.txt.gz -gunzip amazon0601.txt.gz -{% endhighlight %} - -## Use PySpark to load the data in as a Spark DataFrame - -{% highlight python %} -# Load data -import pyspark.sql.functions as F -dataPath = "amazon0601.txt" - -X_train = (sc.textFile(dataPath) - .filter(lambda l: not l.startswith("#")) - .map(lambda l: l.split("\t")) - .map(lambda prods: (int(prods[0]), int(prods[1]), 1.0)) - .toDF(("prod_i", "prod_j", "x_ij")) - .filter("prod_i < 500 AND prod_j < 500") # Filter for memory constraints - .cache()) - -max_prod_i = X_train.select(F.max("prod_i")).first()[0] -max_prod_j = X_train.select(F.max("prod_j")).first()[0] -numProducts = max(max_prod_i, max_prod_j) + 1 # 0-based indexing -print("Total number of products: {}".format(numProducts)) -{% endhighlight %} - -## Create a SystemML MLContext object - -{% highlight python %} -# Create SystemML MLContext -from SystemML import MLContext -ml = MLContext(sc) -{% endhighlight %} - -## Define a kernel for Poisson nonnegative matrix factorization (PNMF) in DML - -{% highlight python %} -# Define PNMF kernel in SystemML's DSL using the R-like syntax for PNMF -pnmf = """ -# data & args -X = read($X) -X = X+1 # change product IDs to be 1-based, rather than 0-based -V = table(X[,1], X[,2]) -size = ifdef($size, -1) -if(size > -1) { - V = V[1:size,1:size] -} -max_iteration = as.integer($maxiter) -rank = as.integer($rank) - -n = nrow(V) -m = ncol(V) -range = 0.01 -W = Rand(rows=n, cols=rank, min=0, max=range, pdf="uniform") -H = Rand(rows=rank, cols=m, min=0, max=range, pdf="uniform") -losses = matrix(0, rows=max_iteration, cols=1) - -# run PNMF -i=1 -while(i <= max_iteration) { - # update params - H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W)) - W = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H)) - - # compute loss - losses[i,] = -1 * (sum(V*log(W%*%H)) - as.scalar(colSums(W)%*%rowSums(H))) - i = i + 1; -} - -# write outputs -write(losses, $lossout) -write(W, $Wout) -write(H, $Hout) -""" -{% endhighlight %} - -## Execute the algorithm - -{% highlight python %} -# Run the PNMF script on SystemML with Spark -ml.reset() -outputs = ml.executeScript(pnmf, {"X": X_train, "maxiter": 100, "rank": 10}, ["W", "H", "losses"]) -{% endhighlight %} - -## Retrieve the losses during training and plot them - -{% highlight python %} -# Plot training loss over time -losses = outputs.getDF(spark.sqlContext, "losses") -xy = losses.sort(losses.ID).map(lambda r: (r[0], r[1])).collect() -x, y = zip(*xy) -plt.plot(x, y) -plt.xlabel('Iteration') -plt.ylabel('Loss') -plt.title('PNMF Training Loss') -{% endhighlight %} - -![Jupyter Loss Graph](img/spark-mlcontext-programming-guide/jupyter_loss_graph.png "Jupyter Loss Graph") - ---- - # Recommended Spark Configuration Settings For best performance, we recommend setting the following flags when running SystemML with Spark: