Repository: incubator-systemml
Updated Branches:
  refs/heads/gh-pages cb6f8456f -> bfb93b03c


[SYSTEMML-1181] Remove old Spark MLContext API documentation

Closes #377.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bfb93b03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bfb93b03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bfb93b03

Branch: refs/heads/gh-pages
Commit: bfb93b03c71493ac210a2b7594eb2a3d3c15fcb5
Parents: cb6f845
Author: Felix Schueler <felix.schue...@ibm.com>
Authored: Fri Feb 3 18:06:10 2017 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Fri Feb 3 18:06:10 2017 -0800

----------------------------------------------------------------------
 spark-mlcontext-programming-guide.md | 1057 -----------------------------
 1 file changed, 1057 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bfb93b03/spark-mlcontext-programming-guide.md
----------------------------------------------------------------------
diff --git a/spark-mlcontext-programming-guide.md 
b/spark-mlcontext-programming-guide.md
index 45c0091..e5df11f 100644
--- a/spark-mlcontext-programming-guide.md
+++ b/spark-mlcontext-programming-guide.md
@@ -34,10 +34,6 @@ The Spark `MLContext` API offers a programmatic interface 
for interacting with S
 such as Scala, Java, and Python. As a result, it offers a convenient way to 
interact with SystemML from the Spark
 Shell and from Notebooks such as Jupyter and Zeppelin.
 
-**NOTE: A new MLContext API has been redesigned for future SystemML releases. 
The old API is available
-in previous versions of SystemML but is deprecated and will be removed soon, 
so please migrate to the new API.**
-
-
 # Spark Shell Example
 
 ## Start Spark Shell with SystemML
@@ -1822,1059 +1818,6 @@ plt.title('PNMF Training Loss')
 
 ---
 
-# Spark Shell Example - OLD API
-
-### ** **NOTE: This API is old and has been deprecated.** **
-**Please use the [new MLContext 
API](spark-mlcontext-programming-guide#spark-shell-example) instead.**
-
-## Start Spark Shell with SystemML
-
-To use SystemML with the Spark Shell, the SystemML jar can be referenced using 
the Spark Shell's `--jars` option.
-Instructions to build the SystemML jar can be found in the [SystemML GitHub 
README](https://github.com/apache/incubator-systemml).
-
-{% highlight bash %}
-./bin/spark-shell --executor-memory 4G --driver-memory 4G --jars SystemML.jar
-{% endhighlight %}
-
-Here is an example of Spark Shell with SystemML and YARN.
-
-{% highlight bash %}
-./bin/spark-shell --master yarn-client --num-executors 3 --driver-memory 5G 
--executor-memory 5G --executor-cores 4 --jars SystemML.jar
-{% endhighlight %}
-
-
-## Create MLContext
-
-An `MLContext` object can be created by passing its constructor a reference to 
the `SparkContext`.
-
-<div class="codetabs">
-
-<div data-lang="Spark Shell" markdown="1">
-{% highlight scala %}
-scala>import org.apache.sysml.api.MLContext
-import org.apache.sysml.api.MLContext
-
-scala> val ml = new MLContext(sc)
-ml: org.apache.sysml.api.MLContext = org.apache.sysml.api.MLContext@33e38c6b
-{% endhighlight %}
-</div>
-
-<div data-lang="Statements" markdown="1">
-{% highlight scala %}
-import org.apache.sysml.api.MLContext
-val ml = new MLContext(sc)
-{% endhighlight %}
-</div>
-
-</div>
-
-
-## Create DataFrame
-
-For demonstration purposes, we'll create a `DataFrame` consisting of 100,000 
rows and 1,000 columns
-of random `double`s.
-
-<div class="codetabs">
-
-<div data-lang="Spark Shell" markdown="1">
-{% highlight scala %}
-scala> import org.apache.spark.sql._
-import org.apache.spark.sql._
-
-scala> import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
-import org.apache.spark.sql.types.{StructType, StructField, DoubleType}
-
-scala> import scala.util.Random
-import scala.util.Random
-
-scala> val numRows = 100000
-numRows: Int = 100000
-
-scala> val numCols = 1000
-numCols: Int = 1000
-
-scala> val data = sc.parallelize(0 to numRows-1).map { _ => 
Row.fromSeq(Seq.fill(numCols)(Random.nextDouble)) }
-data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] 
at map at <console>:33
-
-scala> val schema = StructType((0 to numCols-1).map { i => StructField("C" + 
i, DoubleType, true) } )
-schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(C0,DoubleType,true), StructField(C1,DoubleType,true), 
StructField(C2,DoubleType,true), StructField(C3,DoubleType,true), 
StructField(C4,DoubleType,true), StructField(C5,DoubleType,true), 
StructField(C6,DoubleType,true), StructField(C7,DoubleType,true), 
StructField(C8,DoubleType,true), StructField(C9,DoubleType,true), 
StructField(C10,DoubleType,true), StructField(C11,DoubleType,true), 
StructField(C12,DoubleType,true), StructField(C13,DoubleType,true), 
StructField(C14,DoubleType,true), StructField(C15,DoubleType,true), 
StructField(C16,DoubleType,true), StructField(C17,DoubleType,true), 
StructField(C18,DoubleType,true), StructField(C19,DoubleType,true), 
StructField(C20,DoubleType,true), StructField(C21,DoubleType,true), ...
-
-scala> val df = spark.createDataFrame(data, schema)
-df: org.apache.spark.sql.DataFrame = [C0: double, C1: double, C2: double, C3: 
double, C4: double, C5: double, C6: double, C7: double, C8: double, C9: double, 
C10: double, C11: double, C12: double, C13: double, C14: double, C15: double, 
C16: double, C17: double, C18: double, C19: double, C20: double, C21: double, 
C22: double, C23: double, C24: double, C25: double, C26: double, C27: double, 
C28: double, C29: double, C30: double, C31: double, C32: double, C33: double, 
C34: double, C35: double, C36: double, C37: double, C38: double, C39: double, 
C40: double, C41: double, C42: double, C43: double, C44: double, C45: double, 
C46: double, C47: double, C48: double, C49: double, C50: double, C51: double, 
C52: double, C53: double, C54: double, C55: double, C56: double, C57: double, 
C58: double, C5...
-
-{% endhighlight %}
-</div>
-
-<div data-lang="Statements" markdown="1">
-{% highlight scala %}
-import org.apache.spark.sql._
-import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
-import scala.util.Random
-val numRows = 100000
-val numCols = 1000
-val data = sc.parallelize(0 to numRows-1).map { _ => 
Row.fromSeq(Seq.fill(numCols)(Random.nextDouble)) }
-val schema = StructType((0 to numCols-1).map { i => StructField("C" + i, 
DoubleType, true) } )
-val df = spark.createDataFrame(data, schema)
-{% endhighlight %}
-</div>
-
-</div>
-
-
-## Helper Methods
-
-For convenience, we'll create some helper methods. The SystemML output data is 
encapsulated in
-an `MLOutput` object. The `getScalar()` method extracts a scalar value from a 
`DataFrame` returned by
-`MLOutput`. The `getScalarDouble()` method returns such a value as a `Double`, 
and the
-`getScalarInt()` method returns such a value as an `Int`.
-
-<div class="codetabs">
-
-<div data-lang="Spark Shell" markdown="1">
-{% highlight scala %}
-scala> import org.apache.sysml.api.MLOutput
-import org.apache.sysml.api.MLOutput
-
-scala> def getScalar(outputs: MLOutput, symbol: String): Any =
-     | outputs.getDF(spark.sqlContext, symbol).first()(1)
-getScalar: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Any
-
-scala> def getScalarDouble(outputs: MLOutput, symbol: String): Double =
-     | getScalar(outputs, symbol).asInstanceOf[Double]
-getScalarDouble: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Double
-
-scala> def getScalarInt(outputs: MLOutput, symbol: String): Int =
-     | getScalarDouble(outputs, symbol).toInt
-getScalarInt: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Int
-
-{% endhighlight %}
-</div>
-
-<div data-lang="Statements" markdown="1">
-{% highlight scala %}
-import org.apache.sysml.api.MLOutput
-def getScalar(outputs: MLOutput, symbol: String): Any =
-outputs.getDF(spark.sqlContext, symbol).first()(1)
-def getScalarDouble(outputs: MLOutput, symbol: String): Double =
-getScalar(outputs, symbol).asInstanceOf[Double]
-def getScalarInt(outputs: MLOutput, symbol: String): Int =
-getScalarDouble(outputs, symbol).toInt
-
-{% endhighlight %}
-</div>
-
-</div>
-
-
-## Convert DataFrame to Binary-Block Matrix
-
-SystemML is optimized to operate on a binary-block format for matrix 
representation. For large
-datasets, conversion from DataFrame to binary-block can require a significant 
quantity of time.
-Explicit DataFrame to binary-block conversion allows algorithm performance to 
be measured separately
-from data conversion time.
-
-The SystemML binary-block matrix representation can be thought of as a 
two-dimensional array of blocks, where each block
-consists of a number of rows and columns. In this example, we specify a matrix 
consisting
-of blocks of size 1000x1000. The experimental `dataFrameToBinaryBlock()` 
method of `RDDConverterUtilsExt` is used
-to convert the `DataFrame df` to a SystemML binary-block matrix, which is 
represented by the datatype
-`JavaPairRDD[MatrixIndexes, MatrixBlock]`.
-
-<div class="codetabs">
-
-<div data-lang="Spark Shell" markdown="1">
-{% highlight scala %}
-scala> import 
org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt => 
RDDConverterUtils}
-import 
org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt=>RDDConverterUtils}
-
-scala> import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics
-
-scala> val numRowsPerBlock = 1000
-numRowsPerBlock: Int = 1000
-
-scala> val numColsPerBlock = 1000
-numColsPerBlock: Int = 1000
-
-scala> val mc = new MatrixCharacteristics(numRows, numCols, numRowsPerBlock, 
numColsPerBlock)
-mc: org.apache.sysml.runtime.matrix.MatrixCharacteristics = [100000 x 1000, 
nnz=-1, blocks (1000 x 1000)]
-
-scala> val sysMlMatrix = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc, 
false)
-sysMlMatrix: 
org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock]
 = org.apache.spark.api.java.JavaPairRDD@2bce3248
-
-{% endhighlight %}
-</div>
-
-<div data-lang="Statements" markdown="1">
-{% highlight scala %}
-import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt 
=> RDDConverterUtils}
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-val numRowsPerBlock = 1000
-val numColsPerBlock = 1000
-val mc = new MatrixCharacteristics(numRows, numCols, numRowsPerBlock, 
numColsPerBlock)
-val sysMlMatrix = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc, false)
-
-{% endhighlight %}
-</div>
-
-</div>
-
-
-## DML Script
-
-For this example, we will utilize the following DML Script called `shape.dml` 
that reads in a matrix and outputs the number of rows and the
-number of columns, each represented as a matrix.
-
-{% highlight r %}
-X = read($Xin)
-m = matrix(nrow(X), rows=1, cols=1)
-n = matrix(ncol(X), rows=1, cols=1)
-write(m, $Mout)
-write(n, $Nout)
-{% endhighlight %}
-
-
-## Execute Script
-
-Let's execute our DML script, as shown in the example below. The call to 
`reset()` of `MLContext` is not necessary here, but this method should
-be called if you need to reset inputs and outputs or if you would like to call 
`execute()` with a different script.
-
-An example of registering the `DataFrame df` as an input to the `X` variable 
is shown but commented out. If a DataFrame is registered directly,
-it will implicitly be converted to SystemML's binary-block format. However, 
since we've already explicitly converted the DataFrame to the
-binary-block fixed variable `systemMlMatrix`, we will register this input to 
the `X` variable. We register the `m` and `n` variables
-as outputs.
-
-When SystemML is executed via `DMLScript` (such as in Standalone Mode), inputs 
are supplied as either command-line named arguments
-or positional argument. These inputs are specified in DML scripts by 
prepending them with a `$`. Values are read from or written
-to files using `read`/`write` (DML) and `load`/`save` (PyDML) statements. When 
utilizing the `MLContext` API,
-inputs and outputs can be other data representations, such as `DataFrame`s. 
The input and output data are bound to DML variables.
-The named arguments in the `shape.dml` script do not have default values set 
for them, so we create a `Map` to map the required named
-arguments to blank `String`s so that the script can pass validation.
-
-The `shape.dml` script is executed by the call to `execute()`, where we supply 
the `Map` of required named arguments. The
-execution results are returned as the `MLOutput` fixed variable `outputs`. The 
number of rows is obtained by calling the `getStaticInt()`
-helper method with the `outputs` object and `"m"`. The number of columns is 
retrieved by calling `getStaticInt()` with
-`outputs` and `"n"`.
-
-<div class="codetabs">
-
-<div data-lang="Spark Shell" markdown="1">
-{% highlight scala %}
-scala> ml.reset()
-
-scala> //ml.registerInput("X", df) // implicit conversion of DataFrame to 
binary-block
-
-scala> ml.registerInput("X", sysMlMatrix, numRows, numCols)
-
-scala> ml.registerOutput("m")
-
-scala> ml.registerOutput("n")
-
-scala> val nargs = Map("Xin" -> " ", "Mout" -> " ", "Nout" -> " ")
-nargs: scala.collection.immutable.Map[String,String] = Map(Xin -> " ", Mout -> 
" ", Nout -> " ")
-
-scala> val outputs = ml.execute("shape.dml", nargs)
-15/10/12 16:29:15 WARN : Your hostname, derons-mbp.usca.ibm.com resolves to a 
loopback/non-reachable address: 127.0.0.1, but we couldn't find any external IP 
address!
-15/10/12 16:29:15 WARN OptimizerUtils: Auto-disable multi-threaded text read 
for 'text' and 'csv' due to thread contention on JRE < 1.8 
(java.version=1.7.0_80).
-outputs: org.apache.sysml.api.MLOutput = org.apache.sysml.api.MLOutput@4d424743
-
-scala> val m = getScalarInt(outputs, "m")
-m: Int = 100000
-
-scala> val n = getScalarInt(outputs, "n")
-n: Int = 1000
-
-{% endhighlight %}
-</div>
-
-<div data-lang="Statements" markdown="1">
-{% highlight scala %}
-ml.reset()
-//ml.registerInput("X", df) // implicit conversion of DataFrame to binary-block
-ml.registerInput("X", sysMlMatrix, numRows, numCols)
-ml.registerOutput("m")
-ml.registerOutput("n")
-val nargs = Map("Xin" -> " ", "Mout" -> " ", "Nout" -> " ")
-val outputs = ml.execute("shape.dml", nargs)
-val m = getScalarInt(outputs, "m")
-val n = getScalarInt(outputs, "n")
-
-{% endhighlight %}
-</div>
-
-</div>
-
-
-## DML Script as String
-
-The `MLContext` API allows a DML script to be specified
-as a `String`. Here, we specify a DML script as a fixed `String` variable 
called `minMaxMeanScript`.
-This DML will find the minimum, maximum, and mean value of a matrix.
-
-<div class="codetabs">
-
-<div data-lang="Spark Shell" markdown="1">
-{% highlight scala %}
-scala> val minMaxMeanScript: String =
-     | """
-     | Xin = read(" ")
-     | minOut = matrix(min(Xin), rows=1, cols=1)
-     | maxOut = matrix(max(Xin), rows=1, cols=1)
-     | meanOut = matrix(mean(Xin), rows=1, cols=1)
-     | write(minOut, " ")
-     | write(maxOut, " ")
-     | write(meanOut, " ")
-     | """
-minMaxMeanScript: String =
-"
-Xin = read(" ")
-minOut = matrix(min(Xin), rows=1, cols=1)
-maxOut = matrix(max(Xin), rows=1, cols=1)
-meanOut = matrix(mean(Xin), rows=1, cols=1)
-write(minOut, " ")
-write(maxOut, " ")
-write(meanOut, " ")
-"
-
-{% endhighlight %}
-</div>
-
-<div data-lang="Statements" markdown="1">
-{% highlight scala %}
-val minMaxMeanScript: String =
-"""
-Xin = read(" ")
-minOut = matrix(min(Xin), rows=1, cols=1)
-maxOut = matrix(max(Xin), rows=1, cols=1)
-meanOut = matrix(mean(Xin), rows=1, cols=1)
-write(minOut, " ")
-write(maxOut, " ")
-write(meanOut, " ")
-"""
-
-{% endhighlight %}
-</div>
-
-</div>
-
-## Scala Wrapper for DML
-
-We can create a Scala wrapper for our invocation of the `minMaxMeanScript` DML 
`String`. The `minMaxMean()` method
-takes a `JavaPairRDD[MatrixIndexes, MatrixBlock]` parameter, which is a 
SystemML binary-block matrix representation.
-It also takes a `rows` parameter indicating the number of rows in the matrix, 
a `cols` parameter indicating the number
-of columns in the matrix, and an `MLContext` parameter. The `minMaxMean()` 
method
-returns a tuple consisting of the minimum value in the matrix, the maximum 
value in the matrix, and the computed
-mean value of the matrix.
-
-<div class="codetabs">
-
-<div data-lang="Spark Shell" markdown="1">
-{% highlight scala %}
-scala> import org.apache.sysml.runtime.matrix.data.MatrixIndexes
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes
-
-scala> import org.apache.sysml.runtime.matrix.data.MatrixBlock
-import org.apache.sysml.runtime.matrix.data.MatrixBlock
-
-scala> import org.apache.spark.api.java.JavaPairRDD
-import org.apache.spark.api.java.JavaPairRDD
-
-scala> def minMaxMean(mat: JavaPairRDD[MatrixIndexes, MatrixBlock], rows: Int, 
cols: Int, ml: MLContext): (Double, Double, Double) = {
-     | ml.reset()
-     | ml.registerInput("Xin", mat, rows, cols)
-     | ml.registerOutput("minOut")
-     | ml.registerOutput("maxOut")
-     | ml.registerOutput("meanOut")
-     | val outputs = ml.executeScript(minMaxMeanScript)
-     | val minOut = getScalarDouble(outputs, "minOut")
-     | val maxOut = getScalarDouble(outputs, "maxOut")
-     | val meanOut = getScalarDouble(outputs, "meanOut")
-     | (minOut, maxOut, meanOut)
-     | }
-minMaxMean: (mat: 
org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock],
 rows: Int, cols: Int, ml: org.apache.sysml.api.MLContext)(Double, Double, 
Double)
-
-{% endhighlight %}
-</div>
-
-<div data-lang="Statements" markdown="1">
-{% highlight scala %}
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes
-import org.apache.sysml.runtime.matrix.data.MatrixBlock
-import org.apache.spark.api.java.JavaPairRDD
-def minMaxMean(mat: JavaPairRDD[MatrixIndexes, MatrixBlock], rows: Int, cols: 
Int, ml: MLContext): (Double, Double, Double) = {
-ml.reset()
-ml.registerInput("Xin", mat, rows, cols)
-ml.registerOutput("minOut")
-ml.registerOutput("maxOut")
-ml.registerOutput("meanOut")
-val outputs = ml.executeScript(minMaxMeanScript)
-val minOut = getScalarDouble(outputs, "minOut")
-val maxOut = getScalarDouble(outputs, "maxOut")
-val meanOut = getScalarDouble(outputs, "meanOut")
-(minOut, maxOut, meanOut)
-}
-
-{% endhighlight %}
-</div>
-
-</div>
-
-
-## Invoking DML via Scala Wrapper
-
-Here, we invoke `minMaxMeanScript` using our `minMaxMean()` Scala wrapper 
method. It returns a tuple
-consisting of the minimum value in the matrix, the maximum value in the 
matrix, and the mean value of the matrix.
-
-<div class="codetabs">
-
-<div data-lang="Spark Shell" markdown="1">
-{% highlight scala %}
-scala> val (min, max, mean) = minMaxMean(sysMlMatrix, numRows, numCols, ml)
-15/10/13 14:33:11 WARN OptimizerUtils: Auto-disable multi-threaded text read 
for 'text' and 'csv' due to thread contention on JRE < 1.8 
(java.version=1.7.0_80).
-min: Double = 5.378949397005783E-9                                             
 
-max: Double = 0.9999999934660398
-mean: Double = 0.499988222338507
-
-{% endhighlight %}
-</div>
-
-<div data-lang="Statements" markdown="1">
-{% highlight scala %}
-val (min, max, mean) = minMaxMean(sysMlMatrix, numRows, numCols, ml)
-
-{% endhighlight %}
-</div>
-
-</div>
-
----
-
-# Zeppelin Notebook Example - Linear Regression Algorithm - OLD API
-
-### ** **NOTE: This API is old and has been deprecated.** **
-**Please use the [new MLContext 
API](spark-mlcontext-programming-guide#spark-shell-example) instead.**
-
-Next, we'll consider an example of a SystemML linear regression algorithm run 
from Spark through an Apache Zeppelin notebook.
-Instructions to clone and build Zeppelin can be found at the [GitHub Apache 
Zeppelin](https://github.com/apache/incubator-zeppelin)
-site. This example also will look at the Spark ML linear regression algorithm.
-
-This Zeppelin notebook example can be imported by choosing `Import note` -> 
`Add from URL` from the Zeppelin main page, then insert the following URL:
-
-    
https://raw.githubusercontent.com/apache/incubator-systemml/master/samples/zeppelin-notebooks/2AZ2AQ12B/note.json
-
-Alternatively download <a 
href="https://raw.githubusercontent.com/apache/incubator-systemml/master/samples/zeppelin-notebooks/2AZ2AQ12B/note.json";
 download="note.json">note.json</a>, then import it by choosing `Import note` 
-> `Choose a JSON here` from the Zeppelin main page.
-
-A `conf/zeppelin-env.sh` file is created based on 
`conf/zeppelin-env.sh.template`. For
-this demonstration, it features `SPARK_HOME`, `SPARK_SUBMIT_OPTIONS`, and 
`ZEPPELIN_SPARK_USEHIVECONTEXT`
-environment variables:
-
-       export SPARK_HOME=/Users/example/spark-1.5.1-bin-hadoop2.6
-       export SPARK_SUBMIT_OPTIONS="--jars 
/Users/example/systemml/system-ml/target/SystemML.jar"
-       export ZEPPELIN_SPARK_USEHIVECONTEXT=false
-
-Start Zeppelin using the `zeppelin.sh` script:
-
-       bin/zeppelin.sh
-
-After opening Zeppelin in a brower, we see the "SystemML - Linear Regression" 
note in the list of available
-Zeppelin notes.
-
-![Zeppelin 
Notebook](img/spark-mlcontext-programming-guide/zeppelin-notebook.png "Zeppelin 
Notebook")
-
-If we go to the "SystemML - Linear Regression" note, we see that the note 
consists of several cells of code.
-
-![Zeppelin 'SystemML - Linear Regression' 
Note](img/spark-mlcontext-programming-guide/zeppelin-notebook-systemml-linear-regression.png
 "Zeppelin 'SystemML - Linear Regression' Note")
-
-Let's briefly consider these cells.
-
-## Trigger Spark Startup
-
-This cell triggers Spark to initialize by calling the `SparkContext` `sc` 
object. Information regarding these startup operations can be viewed in the
-console window in which `zeppelin.sh` is running.
-
-**Cell:**
-{% highlight scala %}
-// Trigger Spark Startup
-sc
-{% endhighlight %}
-
-**Output:**
-{% highlight scala %}
-res8: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6ce70bf3
-{% endhighlight %}
-
-
-## Generate Linear Regression Test Data
-
-The Spark `LinearDataGenerator` is used to generate test data for the Spark ML 
and SystemML linear regression algorithms.
-
-**Cell:**
-{% highlight scala %}
-// Generate data
-import org.apache.spark.mllib.util.LinearDataGenerator
-import spark.implicits._
-
-val numRows = 10000
-val numCols = 1000
-val rawData = LinearDataGenerator.generateLinearRDD(sc, numRows, numCols, 
1).toDF()
-
-// Repartition into a more parallelism-friendly number of partitions
-val data = rawData.repartition(64).cache()
-{% endhighlight %}
-
-**Output:**
-{% highlight scala %}
-import org.apache.spark.mllib.util.LinearDataGenerator
-numRows: Int = 10000
-numCols: Int = 1000
-rawData: org.apache.spark.sql.DataFrame = [label: double, features: vector]
-data: org.apache.spark.sql.DataFrame = [label: double, features: vector]
-{% endhighlight %}
-
-
-## Train using Spark ML Linear Regression Algorithm for Comparison
-
-For purpose of comparison, we can train a model using the Spark ML linear 
regression
-algorithm.
-
-**Cell:**
-{% highlight scala %}
-// Spark ML
-import org.apache.spark.ml.regression.LinearRegression
-
-// Model Settings
-val maxIters = 100
-val reg = 0
-val elasticNetParam = 0  // L2 reg
-
-// Fit the model
-val lr = new LinearRegression()
-  .setMaxIter(maxIters)
-  .setRegParam(reg)
-  .setElasticNetParam(elasticNetParam)
-val start = System.currentTimeMillis()
-val model = lr.fit(data)
-val trainingTime = (System.currentTimeMillis() - start).toDouble / 1000.0
-
-// Summarize the model over the training set and gather some metrics
-val trainingSummary = model.summary
-val r2 = trainingSummary.r2
-val iters = trainingSummary.totalIterations
-val trainingTimePerIter = trainingTime / iters
-{% endhighlight %}
-
-**Output:**
-{% highlight scala %}
-import org.apache.spark.ml.regression.LinearRegression
-maxIters: Int = 100
-reg: Int = 0
-elasticNetParam: Int = 0
-lr: org.apache.spark.ml.regression.LinearRegression = linReg_a7f51d676562
-start: Long = 1444672044647
-model: org.apache.spark.ml.regression.LinearRegressionModel = 
linReg_a7f51d676562
-trainingTime: Double = 12.985
-trainingSummary: 
org.apache.spark.ml.regression.LinearRegressionTrainingSummary = 
org.apache.spark.ml.regression.LinearRegressionTrainingSummary@227ba28b
-r2: Double = 0.9677118209276552
-iters: Int = 17
-trainingTimePerIter: Double = 0.7638235294117647
-{% endhighlight %}
-
-
-## Spark ML Linear Regression Summary Statistics
-
-Summary statistics for the Spark ML linear regression algorithm are displayed 
by this cell.
-
-**Cell:**
-{% highlight scala %}
-// Print statistics
-println(s"R2: ${r2}")
-println(s"Iterations: ${iters}")
-println(s"Training time per iter: ${trainingTimePerIter} seconds")
-{% endhighlight %}
-
-**Output:**
-{% highlight scala %}
-R2: 0.9677118209276552
-Iterations: 17
-Training time per iter: 0.7638235294117647 seconds
-{% endhighlight %}
-
-
-## SystemML Linear Regression Algorithm
-
-The `linearReg` fixed `String` variable is set to
-a linear regression algorithm written in DML, SystemML's Declarative Machine 
Learning language.
-
-
-
-**Cell:**
-{% highlight scala %}
-// SystemML kernels
-val linearReg =
-"""
-#
-# THIS SCRIPT SOLVES LINEAR REGRESSION USING THE CONJUGATE GRADIENT ALGORITHM
-#
-# INPUT PARAMETERS:
-# 
--------------------------------------------------------------------------------------------
-# NAME  TYPE   DEFAULT  MEANING
-# 
--------------------------------------------------------------------------------------------
-# X     String  ---     Matrix X of feature vectors
-# Y     String  ---     1-column Matrix Y of response values
-# icpt  Int      0      Intercept presence, shifting and rescaling the columns 
of X:
-#                       0 = no intercept, no shifting, no rescaling;
-#                       1 = add intercept, but neither shift nor rescale X;
-#                       2 = add intercept, shift & rescale X columns to mean = 
0, variance = 1
-# reg   Double 0.000001 Regularization constant (lambda) for 
L2-regularization; set to nonzero
-#                       for highly dependend/sparse/numerous features
-# tol   Double 0.000001 Tolerance (epsilon); conjugate graduent procedure 
terminates early if
-#                       L2 norm of the beta-residual is less than tolerance * 
its initial norm
-# maxi  Int      0      Maximum number of conjugate gradient iterations, 0 = 
no maximum
-# 
--------------------------------------------------------------------------------------------
-#
-# OUTPUT:
-# B Estimated regression parameters (the betas) to store
-#
-# Note: Matrix of regression parameters (the betas) and its size depend on 
icpt input value:
-#         OUTPUT SIZE:   OUTPUT CONTENTS:                HOW TO PREDICT Y FROM 
X AND B:
-# icpt=0: ncol(X)   x 1  Betas for X only                Y ~ X %*% 
B[1:ncol(X), 1], or just X %*% B
-# icpt=1: ncol(X)+1 x 1  Betas for X and intercept       Y ~ X %*% 
B[1:ncol(X), 1] + B[ncol(X)+1, 1]
-# icpt=2: ncol(X)+1 x 2  Col.1: betas for X & intercept  Y ~ X %*% 
B[1:ncol(X), 1] + B[ncol(X)+1, 1]
-#                        Col.2: betas for shifted/rescaled X and intercept
-#
-
-fileX = "";
-fileY = "";
-fileB = "";
-
-intercept_status = ifdef ($icpt, 0);     # $icpt=0;
-tolerance = ifdef ($tol, 0.000001);      # $tol=0.000001;
-max_iteration = ifdef ($maxi, 0);        # $maxi=0;
-regularization = ifdef ($reg, 0.000001); # $reg=0.000001;
-
-X = read (fileX);
-y = read (fileY);
-
-n = nrow (X);
-m = ncol (X);
-ones_n = matrix (1, rows = n, cols = 1);
-zero_cell = matrix (0, rows = 1, cols = 1);
-
-# Introduce the intercept, shift and rescale the columns of X if needed
-
-m_ext = m;
-if (intercept_status == 1 | intercept_status == 2)  # add the intercept column
-{
-    X = append (X, ones_n);
-    m_ext = ncol (X);
-}
-
-scale_lambda = matrix (1, rows = m_ext, cols = 1);
-if (intercept_status == 1 | intercept_status == 2)
-{
-    scale_lambda [m_ext, 1] = 0;
-}
-
-if (intercept_status == 2)  # scale-&-shift X columns to mean 0, variance 1
-{                           # Important assumption: X [, m_ext] = ones_n
-    avg_X_cols = t(colSums(X)) / n;
-    var_X_cols = (t(colSums (X ^ 2)) - n * (avg_X_cols ^ 2)) / (n - 1);
-    is_unsafe = ppred (var_X_cols, 0.0, "<=");
-    scale_X = 1.0 / sqrt (var_X_cols * (1 - is_unsafe) + is_unsafe);
-    scale_X [m_ext, 1] = 1;
-    shift_X = - avg_X_cols * scale_X;
-    shift_X [m_ext, 1] = 0;
-} else {
-    scale_X = matrix (1, rows = m_ext, cols = 1);
-    shift_X = matrix (0, rows = m_ext, cols = 1);
-}
-
-# Henceforth, if intercept_status == 2, we use "X %*% (SHIFT/SCALE TRANSFORM)"
-# instead of "X".  However, in order to preserve the sparsity of X,
-# we apply the transform associatively to some other part of the expression
-# in which it occurs.  To avoid materializing a large matrix, we rewrite it:
-#
-# ssX_A  = (SHIFT/SCALE TRANSFORM) %*% A    --- is rewritten as:
-# ssX_A  = diag (scale_X) %*% A;
-# ssX_A [m_ext, ] = ssX_A [m_ext, ] + t(shift_X) %*% A;
-#
-# tssX_A = t(SHIFT/SCALE TRANSFORM) %*% A   --- is rewritten as:
-# tssX_A = diag (scale_X) %*% A + shift_X %*% A [m_ext, ];
-
-lambda = scale_lambda * regularization;
-beta_unscaled = matrix (0, rows = m_ext, cols = 1);
-
-if (max_iteration == 0) {
-    max_iteration = m_ext;
-}
-i = 0;
-
-# BEGIN THE CONJUGATE GRADIENT ALGORITHM
-r = - t(X) %*% y;
-
-if (intercept_status == 2) {
-    r = scale_X * r + shift_X %*% r [m_ext, ];
-}
-
-p = - r;
-norm_r2 = sum (r ^ 2);
-norm_r2_initial = norm_r2;
-norm_r2_target = norm_r2_initial * tolerance ^ 2;
-
-while (i < max_iteration & norm_r2 > norm_r2_target)
-{
-    if (intercept_status == 2) {
-        ssX_p = scale_X * p;
-        ssX_p [m_ext, ] = ssX_p [m_ext, ] + t(shift_X) %*% p;
-    } else {
-        ssX_p = p;
-    }
-
-    q = t(X) %*% (X %*% ssX_p);
-
-    if (intercept_status == 2) {
-        q = scale_X * q + shift_X %*% q [m_ext, ];
-    }
-
-    q = q + lambda * p;
-    a = norm_r2 / sum (p * q);
-    beta_unscaled = beta_unscaled + a * p;
-    r = r + a * q;
-    old_norm_r2 = norm_r2;
-    norm_r2 = sum (r ^ 2);
-    p = -r + (norm_r2 / old_norm_r2) * p;
-    i = i + 1;
-}
-# END THE CONJUGATE GRADIENT ALGORITHM
-
-if (intercept_status == 2) {
-    beta = scale_X * beta_unscaled;
-    beta [m_ext, ] = beta [m_ext, ] + t(shift_X) %*% beta_unscaled;
-} else {
-    beta = beta_unscaled;
-}
-
-# Output statistics
-avg_tot = sum (y) / n;
-ss_tot = sum (y ^ 2);
-ss_avg_tot = ss_tot - n * avg_tot ^ 2;
-var_tot = ss_avg_tot / (n - 1);
-y_residual = y - X %*% beta;
-avg_res = sum (y_residual) / n;
-ss_res = sum (y_residual ^ 2);
-ss_avg_res = ss_res - n * avg_res ^ 2;
-
-R2_temp = 1 - ss_res / ss_avg_tot
-R2 = matrix(R2_temp, rows=1, cols=1)
-write(R2, "")
-
-totalIters = matrix(i, rows=1, cols=1)
-write(totalIters, "")
-
-# Prepare the output matrix
-if (intercept_status == 2) {
-    beta_out = append (beta, beta_unscaled);
-} else {
-    beta_out = beta;
-}
-
-write (beta_out, fileB);
-"""
-{% endhighlight %}
-
-**Output:**
-
-None
-
-
-## Helper Methods
-
-This cell contains helper methods to return `Double` and `Int` values from 
output generated by the `MLContext` API.
-
-**Cell:**
-{% highlight scala %}
-// Helper functions
-import org.apache.sysml.api.MLOutput
-
-def getScalar(outputs: MLOutput, symbol: String): Any =
-    outputs.getDF(spark.sqlContext, symbol).first()(1)
-
-def getScalarDouble(outputs: MLOutput, symbol: String): Double =
-    getScalar(outputs, symbol).asInstanceOf[Double]
-
-def getScalarInt(outputs: MLOutput, symbol: String): Int =
-    getScalarDouble(outputs, symbol).toInt
-{% endhighlight %}
-
-**Output:**
-{% highlight scala %}
-import org.apache.sysml.api.MLOutput
-getScalar: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Any
-getScalarDouble: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Double
-getScalarInt: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Int
-{% endhighlight %}
-
-
-## Convert DataFrame to Binary-Block Format
-
-SystemML uses a binary-block format for matrix data representation. This cell
-explicitly converts the `DataFrame` `data` object to a binary-block `features` 
matrix
-and single-column `label` matrix, both represented by the
-`JavaPairRDD[MatrixIndexes, MatrixBlock]` datatype.
-
-
-**Cell:**
-{% highlight scala %}
-// Imports
-import org.apache.sysml.api.MLContext
-import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt 
=> RDDConverterUtils}
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-
-// Create SystemML context
-val ml = new MLContext(sc)
-
-// Convert data to proper format
-val mcX = new MatrixCharacteristics(numRows, numCols, 1000, 1000)
-val mcY = new MatrixCharacteristics(numRows, 1, 1000, 1000)
-val X = RDDConverterUtils.vectorDataFrameToBinaryBlock(sc, data, mcX, false, 
"features")
-val y = RDDConverterUtils.dataFrameToBinaryBlock(sc, data.select("label"), 
mcY, false)
-// val y = data.select("label")
-
-// Cache
-val X2 = X.cache()
-val y2 = y.cache()
-val cnt1 = X2.count()
-val cnt2 = y2.count()
-{% endhighlight %}
-
-**Output:**
-{% highlight scala %}
-import org.apache.sysml.api.MLContext
-import 
org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt=>RDDConverterUtils}
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics
-ml: org.apache.sysml.api.MLContext = org.apache.sysml.api.MLContext@38d59245
-mcX: org.apache.sysml.runtime.matrix.MatrixCharacteristics = [10000 x 1000, 
nnz=-1, blocks (1000 x 1000)]
-mcY: org.apache.sysml.runtime.matrix.MatrixCharacteristics = [10000 x 1, 
nnz=-1, blocks (1000 x 1000)]
-X: 
org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock]
 = org.apache.spark.api.java.JavaPairRDD@b5a86e3
-y: 
org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock]
 = org.apache.spark.api.java.JavaPairRDD@56377665
-X2: 
org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock]
 = org.apache.spark.api.java.JavaPairRDD@650f29d2
-y2: 
org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock]
 = org.apache.spark.api.java.JavaPairRDD@334857a8
-cnt1: Long = 10
-cnt2: Long = 10
-{% endhighlight %}
-
-
-## Train using SystemML Linear Regression Algorithm
-
-Now, we can train our model using the SystemML linear regression algorithm. We 
register the features matrix `X` and the label matrix `y` as inputs. We 
register the `beta_out` matrix,
-`R2`, and `totalIters` as outputs.
-
-**Cell:**
-{% highlight scala %}
-// Register inputs & outputs
-ml.reset()  
-ml.registerInput("X", X, numRows, numCols)
-ml.registerInput("y", y, numRows, 1)
-// ml.registerInput("y", y)
-ml.registerOutput("beta_out")
-ml.registerOutput("R2")
-ml.registerOutput("totalIters")
-
-// Run the script
-val start = System.currentTimeMillis()
-val outputs = ml.executeScript(linearReg)
-val trainingTime = (System.currentTimeMillis() - start).toDouble / 1000.0
-
-// Get outputs
-val B = outputs.getDF(spark.sqlContext, "beta_out").sort("ID").drop("ID")
-val r2 = getScalarDouble(outputs, "R2")
-val iters = getScalarInt(outputs, "totalIters")
-val trainingTimePerIter = trainingTime / iters
-{% endhighlight %}
-
-**Output:**
-{% highlight scala %}
-start: Long = 1444672090620
-outputs: org.apache.sysml.api.MLOutput = org.apache.sysml.api.MLOutput@5d2c22d0
-trainingTime: Double = 1.176
-B: org.apache.spark.sql.DataFrame = [C1: double]
-r2: Double = 0.9677079547216473
-iters: Int = 12
-trainingTimePerIter: Double = 0.09799999999999999
-{% endhighlight %}
-
-
-## SystemML Linear Regression Summary Statistics
-
-SystemML linear regression summary statistics are displayed by this cell.
-
-**Cell:**
-{% highlight scala %}
-// Print statistics
-println(s"R2: ${r2}")
-println(s"Iterations: ${iters}")
-println(s"Training time per iter: ${trainingTimePerIter} seconds")
-B.describe().show()
-{% endhighlight %}
-
-**Output:**
-{% highlight scala %}
-R2: 0.9677079547216473
-Iterations: 12
-Training time per iter: 0.2334166666666667 seconds
-+-------+-------------------+
-|summary|                 C1|
-+-------+-------------------+
-|  count|               1000|
-|   mean| 0.0184500840658385|
-| stddev| 0.2764750319432085|
-|    min|-0.5426068958986378|
-|    max| 0.5225309861616542|
-+-------+-------------------+
-{% endhighlight %}
-
-
----
-
-# Jupyter (PySpark) Notebook Example - Poisson Nonnegative Matrix 
Factorization - OLD API
-
-### ** **NOTE: This API is old and has been deprecated.** **
-**Please use the [new MLContext 
API](spark-mlcontext-programming-guide#jupyter-pyspark-notebook-example---poisson-nonnegative-matrix-factorization)
 instead.**
-
-Here, we'll explore the use of SystemML via PySpark in a [Jupyter 
notebook](http://jupyter.org/).
-This Jupyter notebook example can be nicely viewed in a rendered state
-[on 
GitHub](https://github.com/apache/incubator-systemml/blob/master/samples/jupyter-notebooks/SystemML-PySpark-Recommendation-Demo.ipynb),
-and can be [downloaded 
here](https://raw.githubusercontent.com/apache/incubator-systemml/master/samples/jupyter-notebooks/SystemML-PySpark-Recommendation-Demo.ipynb)
 to a directory of your choice.
-
-From the directory with the downloaded notebook, start Jupyter with PySpark:
-
-    PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" 
$SPARK_HOME/bin/pyspark --master local[*] --driver-class-path 
$SYSTEMML_HOME/SystemML.jar
-
-This will open Jupyter in a browser:
-
-![Jupyter Notebook](img/spark-mlcontext-programming-guide/jupyter1.png 
"Jupyter Notebook")
-
-We can then open up the `SystemML-PySpark-Recommendation-Demo` notebook:
-
-![Jupyter Notebook](img/spark-mlcontext-programming-guide/jupyter2.png 
"Jupyter Notebook")
-
-## Set up the notebook and download the data
-
-{% highlight python %}
-%load_ext autoreload
-%autoreload 2
-%matplotlib inline
-
-# Add SystemML PySpark API file.
-sc.addPyFile("https://raw.githubusercontent.com/apache/incubator-systemml/3d5f9b11741f6d6ecc6af7cbaa1069cde32be838/src/main/java/org/apache/sysml/api/python/SystemML.py";)
-
-import numpy as np
-import matplotlib.pyplot as plt
-plt.rcParams['figure.figsize'] = (10, 6)
-{% endhighlight %}
-
-{% highlight python %}
-%%sh
-# Download dataset
-curl -O http://snap.stanford.edu/data/amazon0601.txt.gz
-gunzip amazon0601.txt.gz
-{% endhighlight %}
-
-## Use PySpark to load the data in as a Spark DataFrame
-
-{% highlight python %}
-# Load data
-import pyspark.sql.functions as F
-dataPath = "amazon0601.txt"
-
-X_train = (sc.textFile(dataPath)
-    .filter(lambda l: not l.startswith("#"))
-    .map(lambda l: l.split("\t"))
-    .map(lambda prods: (int(prods[0]), int(prods[1]), 1.0))
-    .toDF(("prod_i", "prod_j", "x_ij"))
-    .filter("prod_i < 500 AND prod_j < 500") # Filter for memory constraints
-    .cache())
-
-max_prod_i = X_train.select(F.max("prod_i")).first()[0]
-max_prod_j = X_train.select(F.max("prod_j")).first()[0]
-numProducts = max(max_prod_i, max_prod_j) + 1 # 0-based indexing
-print("Total number of products: {}".format(numProducts))
-{% endhighlight %}
-
-## Create a SystemML MLContext object
-
-{% highlight python %}
-# Create SystemML MLContext
-from SystemML import MLContext
-ml = MLContext(sc)
-{% endhighlight %}
-
-## Define a kernel for Poisson nonnegative matrix factorization (PNMF) in DML
-
-{% highlight python %}
-# Define PNMF kernel in SystemML's DSL using the R-like syntax for PNMF
-pnmf = """
-# data & args
-X = read($X)
-X = X+1 # change product IDs to be 1-based, rather than 0-based
-V = table(X[,1], X[,2])
-size = ifdef($size, -1)
-if(size > -1) {
-    V = V[1:size,1:size]
-}
-max_iteration = as.integer($maxiter)
-rank = as.integer($rank)
-
-n = nrow(V)
-m = ncol(V)
-range = 0.01
-W = Rand(rows=n, cols=rank, min=0, max=range, pdf="uniform")
-H = Rand(rows=rank, cols=m, min=0, max=range, pdf="uniform")
-losses = matrix(0, rows=max_iteration, cols=1)
-
-# run PNMF
-i=1
-while(i <= max_iteration) {
-  # update params
-  H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W))
-  W = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))
-
-  # compute loss
-  losses[i,] = -1 * (sum(V*log(W%*%H)) - as.scalar(colSums(W)%*%rowSums(H)))
-  i = i + 1;
-}
-
-# write outputs
-write(losses, $lossout)
-write(W, $Wout)
-write(H, $Hout)
-"""
-{% endhighlight %}
-
-## Execute the algorithm
-
-{% highlight python %}
-# Run the PNMF script on SystemML with Spark
-ml.reset()
-outputs = ml.executeScript(pnmf, {"X": X_train, "maxiter": 100, "rank": 10}, 
["W", "H", "losses"])
-{% endhighlight %}
-
-## Retrieve the losses during training and plot them
-
-{% highlight python %}
-# Plot training loss over time
-losses = outputs.getDF(spark.sqlContext, "losses")
-xy = losses.sort(losses.ID).map(lambda r: (r[0], r[1])).collect()
-x, y = zip(*xy)
-plt.plot(x, y)
-plt.xlabel('Iteration')
-plt.ylabel('Loss')
-plt.title('PNMF Training Loss')
-{% endhighlight %}
-
-![Jupyter Loss 
Graph](img/spark-mlcontext-programming-guide/jupyter_loss_graph.png "Jupyter 
Loss Graph")
-
----
-
 # Recommended Spark Configuration Settings
 
 For best performance, we recommend setting the following flags when running 
SystemML with Spark:


Reply via email to