[SYSTEMML-451] Initial version of python matrix class - Added matrix class that supports lazy evaluation of elementary matrix operations.
- Updated documentation for Python users that explains usage of mllearn, matrix and mlcontext. - Added a setup file for pip installer. Closes #197 Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/48a7267f Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/48a7267f Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/48a7267f Branch: refs/heads/master Commit: 48a7267f804f56cc9859b6a89d3e5f491f5a4558 Parents: 9279e8b Author: Niketan Pansare <[email protected]> Authored: Sun Aug 28 09:53:35 2016 -0700 Committer: Niketan Pansare <[email protected]> Committed: Sun Aug 28 09:53:35 2016 -0700 ---------------------------------------------------------------------- docs/_layouts/global.html | 1 + docs/algorithms-classification.md | 36 +- docs/algorithms-regression.md | 16 +- docs/beginners-guide-python.md | 334 +++++++++++++++++++ docs/index.md | 2 + .../apache/sysml/api/mlcontext/MLResults.java | 21 +- .../org/apache/sysml/api/python/SystemML.py | 230 ------------- .../java/org/apache/sysml/api/python/test.py | 178 ---------- src/main/python/MANIFEST.in | 29 ++ src/main/python/SystemML.py | 258 -------------- src/main/python/SystemML/__init__.py | 29 ++ src/main/python/SystemML/converters.py | 100 ++++++ src/main/python/SystemML/defmatrix.py | 295 ++++++++++++++++ src/main/python/SystemML/mlcontext.py | 296 ++++++++++++++++ src/main/python/SystemML/mllearn/__init__.py | 25 ++ src/main/python/SystemML/mllearn/estimators.py | 302 +++++++++++++++++ src/main/python/SystemMLtests.py | 104 ------ src/main/python/setup.py | 77 +++++ src/main/python/tests/test_mlcontext.py | 104 ++++++ src/main/python/tests/test_mllearn.py | 178 ++++++++++ src/main/python/uploadToPyPI.sh | 34 ++ 21 files changed, 1852 insertions(+), 797 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/docs/_layouts/global.html ---------------------------------------------------------------------- diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index aebd204..a866cee 100644 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -56,6 +56,7 @@ <li><b>Language Guides:</b></li> <li><a href="dml-language-reference.html">DML Language Reference</a></li> <li><a href="beginners-guide-to-dml-and-pydml.html">Beginner's Guide to DML and PyDML</a></li> + <li><a href="beginners-guide-python.html">Beginner's Guide for Python users</a></li> <li class="divider"></li> <li><b>ML Algorithms:</b></li> <li><a href="algorithms-reference.html">Algorithms Reference</a></li> http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/docs/algorithms-classification.md ---------------------------------------------------------------------- diff --git a/docs/algorithms-classification.md b/docs/algorithms-classification.md index f25d78e..03c78d6 100644 --- a/docs/algorithms-classification.md +++ b/docs/algorithms-classification.md @@ -129,9 +129,9 @@ Eqs. (1) and (2). <div class="codetabs"> <div data-lang="Python" markdown="1"> {% highlight python %} -import SystemML as sml +from SystemML.mllearn import LogisticRegression # C = 1/reg -logistic = sml.mllearn.LogisticRegression(sqlCtx, fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0) +logistic = LogisticRegression(sqlCtx, fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0) # X_train, y_train and X_test can be NumPy matrices or Pandas DataFrame or SciPy Sparse Matrix y_test = logistic.fit(X_train, y_train).predict(X_test) # df_train is DataFrame that contains two columns: "features" (of type Vector) and "label". df_test is a DataFrame that contains the column "features" @@ -229,7 +229,7 @@ SystemML Language Reference for details. {% highlight python %} # Scikit-learn way from sklearn import datasets, neighbors -import SystemML as sml +from SystemML.mllearn import LogisticRegression from pyspark.sql import SQLContext sqlCtx = SQLContext(sc) digits = datasets.load_digits() @@ -240,12 +240,12 @@ X_train = X_digits[:.9 * n_samples] y_train = y_digits[:.9 * n_samples] X_test = X_digits[.9 * n_samples:] y_test = y_digits[.9 * n_samples:] -logistic = sml.mllearn.LogisticRegression(sqlCtx) +logistic = LogisticRegression(sqlCtx) print('LogisticRegression score: %f' % logistic.fit(X_train, y_train).score(X_test, y_test)) # MLPipeline way from pyspark.ml import Pipeline -import SystemML as sml +from SystemML.mllearn import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import SQLContext sqlCtx = SQLContext(sc) @@ -265,7 +265,7 @@ training = sqlCtx.createDataFrame([ ], ["id", "text", "label"]) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20) -lr = sml.mllearn.LogisticRegression(sqlCtx) +lr = LogisticRegression(sqlCtx) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) test = sqlCtx.createDataFrame([ @@ -458,9 +458,9 @@ support vector machine (`y` with domain size `2`). <div class="codetabs"> <div data-lang="Python" markdown="1"> {% highlight python %} -import SystemML as sml +from SystemML.mllearn import SVM # C = 1/reg -svm = sml.mllearn.SVM(sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False) +svm = SVM(sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False) # X_train, y_train and X_test can be NumPy matrices or Pandas DataFrame or SciPy Sparse Matrix y_test = svm.fit(X_train, y_train) # df_train is DataFrame that contains two columns: "features" (of type Vector) and "label". df_test is a DataFrame that contains the column "features" @@ -714,9 +714,9 @@ class labels. <div class="codetabs"> <div data-lang="Python" markdown="1"> {% highlight python %} -import SystemML as sml +from SystemML.mllearn import SVM # C = 1/reg -svm = sml.mllearn.SVM(sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=True) +svm = SVM(sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=True) # X_train, y_train and X_test can be NumPy matrices or Pandas DataFrame or SciPy Sparse Matrix y_test = svm.fit(X_train, y_train) # df_train is DataFrame that contains two columns: "features" (of type Vector) and "label". df_test is a DataFrame that contains the column "features" @@ -852,7 +852,7 @@ SystemML Language Reference for details. {% highlight python %} # Scikit-learn way from sklearn import datasets, neighbors -import SystemML as sml +from SystemML.mllearn import SVM from pyspark.sql import SQLContext sqlCtx = SQLContext(sc) digits = datasets.load_digits() @@ -863,12 +863,12 @@ X_train = X_digits[:.9 * n_samples] y_train = y_digits[:.9 * n_samples] X_test = X_digits[.9 * n_samples:] y_test = y_digits[.9 * n_samples:] -svm = sml.mllearn.SVM(sqlCtx, is_multi_class=True) +svm = SVM(sqlCtx, is_multi_class=True) print('LogisticRegression score: %f' % svm.fit(X_train, y_train).score(X_test, y_test)) # MLPipeline way from pyspark.ml import Pipeline -import SystemML as sml +from SystemML.mllearn import SVM from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import SQLContext sqlCtx = SQLContext(sc) @@ -888,7 +888,7 @@ training = sqlCtx.createDataFrame([ ], ["id", "text", "label"]) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20) -svm = sml.mllearn.SVM(sqlCtx, is_multi_class=True) +svm = SVM(sqlCtx, is_multi_class=True) pipeline = Pipeline(stages=[tokenizer, hashingTF, svm]) model = pipeline.fit(training) test = sqlCtx.createDataFrame([ @@ -1026,8 +1026,8 @@ applicable when all features are counts of categorical values. <div class="codetabs"> <div data-lang="Python" markdown="1"> {% highlight python %} -import SystemML as sml -nb = sml.mllearn.NaiveBayes(sqlCtx, laplace=1.0) +from SystemML.mllearn import NaiveBayes +nb = NaiveBayes(sqlCtx, laplace=1.0) # X_train, y_train and X_test can be NumPy matrices or Pandas DataFrame or SciPy Sparse Matrix y_test = nb.fit(X_train, y_train) # df_train is DataFrame that contains two columns: "features" (of type Vector) and "label". df_test is a DataFrame that contains the column "features" @@ -1149,7 +1149,7 @@ SystemML Language Reference for details. {% highlight python %} from sklearn.datasets import fetch_20newsgroups from sklearn.feature_extraction.text import TfidfVectorizer -import SystemML as sml +from SystemML.mllearn import NaiveBayes from sklearn import metrics from pyspark.sql import SQLContext sqlCtx = SQLContext(sc) @@ -1160,7 +1160,7 @@ vectorizer = TfidfVectorizer() # Both vectors and vectors_test are SciPy CSR matrix vectors = vectorizer.fit_transform(newsgroups_train.data) vectors_test = vectorizer.transform(newsgroups_test.data) -nb = sml.mllearn.NaiveBayes(sqlCtx) +nb = NaiveBayes(sqlCtx) nb.fit(vectors, newsgroups_train.target) pred = nb.predict(vectors_test) metrics.f1_score(newsgroups_test.target, pred, average='weighted') http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/docs/algorithms-regression.md ---------------------------------------------------------------------- diff --git a/docs/algorithms-regression.md b/docs/algorithms-regression.md index 5241f5f..6585b00 100644 --- a/docs/algorithms-regression.md +++ b/docs/algorithms-regression.md @@ -82,9 +82,9 @@ efficient when the number of features $m$ is relatively small <div class="codetabs"> <div data-lang="Python" markdown="1"> {% highlight python %} -import SystemML as sml +from SystemML.mllearn import LinearRegression # C = 1/reg -lr = sml.mllearn.LinearRegression(sqlCtx, fit_intercept=True, C=1.0, solver='direct-solve') +lr = LinearRegression(sqlCtx, fit_intercept=True, C=1.0, solver='direct-solve') # X_train, y_train and X_test can be NumPy matrices or Pandas DataFrame or SciPy Sparse Matrix y_test = lr.fit(X_train, y_train) # df_train is DataFrame that contains two columns: "features" (of type Vector) and "label". df_test is a DataFrame that contains the column "features" @@ -124,9 +124,9 @@ y_test = lr.fit(df_train) <div class="codetabs"> <div data-lang="Python" markdown="1"> {% highlight python %} -import SystemML as sml +from SystemML.mllearn import LinearRegression # C = 1/reg -lr = sml.mllearn.LinearRegression(sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg') +lr = LinearRegression(sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg') # X_train, y_train and X_test can be NumPy matrices or Pandas DataFrames or SciPy Sparse matrices y_test = lr.fit(X_train, y_train) # df_train is DataFrame that contains two columns: "features" (of type Vector) and "label". df_test is a DataFrame that contains the column "features" @@ -222,7 +222,7 @@ SystemML Language Reference for details. {% highlight python %} import numpy as np from sklearn import datasets -import SystemML as sml +from SystemML.mllearn import LinearRegression from pyspark.sql import SQLContext # Load the diabetes dataset diabetes = datasets.load_diabetes() @@ -235,7 +235,7 @@ diabetes_X_test = diabetes_X[-20:] diabetes_y_train = diabetes.target[:-20] diabetes_y_test = diabetes.target[-20:] # Create linear regression object -regr = sml.mllearn.LinearRegression(sqlCtx, solver='direct-solve') +regr = LinearRegression(sqlCtx, solver='direct-solve') # Train the model using the training sets regr.fit(diabetes_X_train, diabetes_y_train) # The mean square error @@ -277,7 +277,7 @@ print("Residual sum of squares: %.2f" % np.mean((regr.predict(diabetes_X_test) - {% highlight python %} import numpy as np from sklearn import datasets -import SystemML as sml +from SystemML.mllearn import LinearRegression from pyspark.sql import SQLContext # Load the diabetes dataset diabetes = datasets.load_diabetes() @@ -290,7 +290,7 @@ diabetes_X_test = diabetes_X[-20:] diabetes_y_train = diabetes.target[:-20] diabetes_y_test = diabetes.target[-20:] # Create linear regression object -regr = sml.mllearn.LinearRegression(sqlCtx, solver='newton-cg') +regr = LinearRegression(sqlCtx, solver='newton-cg') # Train the model using the training sets regr.fit(diabetes_X_train, diabetes_y_train) # The mean square error http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/docs/beginners-guide-python.md ---------------------------------------------------------------------- diff --git a/docs/beginners-guide-python.md b/docs/beginners-guide-python.md new file mode 100644 index 0000000..790ed43 --- /dev/null +++ b/docs/beginners-guide-python.md @@ -0,0 +1,334 @@ +--- +layout: global +title: Beginner's Guide for Python users +description: Beginner's Guide for Python users +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +* This will become a table of contents (this text will be scraped). +{:toc} + +<br/> + +## Introduction + +SystemML enables flexible, scalable machine learning. This flexibility is achieved through the specification of a high-level declarative machine learning language that comes in two flavors, +one with an R-like syntax (DML) and one with a Python-like syntax (PyDML). + +Algorithm scripts written in DML and PyDML can be run on Hadoop, on Spark, or in Standalone mode. +No script modifications are required to change between modes. SystemML automatically performs advanced optimizations +based on data and cluster characteristics, so much of the need to manually tweak algorithms is largely reduced or eliminated. +To understand more about DML and PyDML, we recommend that you read [Beginner's Guide to DML and PyDML](https://apache.github.io/incubator-systemml/beginners-guide-to-dml-and-pydml.html). + +For convenience of Python users, SystemML exposes several language-level APIs that allow Python users to use SystemML +and its algorithms without the need to know DML or PyDML. We explain these APIs in the below sections with example usecases. + +## Download & Setup + +Before you get started on SystemML, make sure that your environment is set up and ready to go. + +### Install Java (need Java 8) and Apache Spark + +If you already have a Apache Spark installation, you can skip this step. + +<div class="codetabs"> +<div data-lang="OSX" markdown="1"> +```bash +/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" +brew tap caskroom/cask +brew install Caskroom/cask/java +brew install apache-spark +``` +</div> +<div data-lang="Linux" markdown="1"> +```bash +ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Linuxbrew/install/master/install)" +brew tap caskroom/cask +brew install Caskroom/cask/java +brew install apache-spark +``` +</div> +</div> + +### Install SystemML + +#### Step 1: Install SystemML Python package + +```bash +pip install SystemML +``` + +#### Step 2: Download SystemML Java binaries + +SystemML Python package downloads the corresponding Java binaries (along with algorithms) and places them +into the installed location. To find the location of the downloaded Java binaries, use the following command: + +```bash +python -c 'import imp; import os; print os.path.join(imp.find_module("SystemML")[1], "SystemML-java")' +``` + +#### Step 3: (Optional but recommended) Set SYSTEMML_HOME environment variable +<div class="codetabs"> +<div data-lang="OSX" markdown="1"> +```bash +SYSTEMML_HOME=`python -c 'import imp; import os; print os.path.join(imp.find_module("SystemML")[1], "SystemML-java")'` +# If you are using zsh or ksh or csh, append it to ~/.zshrc or ~/.profile or ~/.login respectively. +echo '' >> ~/.bashrc +echo 'export SYSTEMML_HOME='$SYSTEMML_HOME >> ~/.bashrc +``` +</div> +<div data-lang="Linux" markdown="1"> +```bash +SYSTEMML_HOME=`python -c 'import imp; import os; print os.path.join(imp.find_module("SystemML")[1], "SystemML-java")'` +# If you are using zsh or ksh or csh, append it to ~/.zshrc or ~/.profile or ~/.login respectively. +echo '' >> ~/.bashrc +echo 'export SYSTEMML_HOME='$SYSTEMML_HOME >> ~/.bashrc +``` +</div> +</div> + +Note: the user is free to either use the prepackaged Java binaries +or download them from [SystemML website](http://systemml.apache.org/download.html) +or build them from the [source](https://github.com/apache/incubator-systemml). + +### Start Pyspark shell + +<div class="codetabs"> +<div data-lang="OSX" markdown="1"> +```bash +pyspark --master local[*] --driver-class-path $SYSTEMML_HOME"/SystemML.jar" +``` +</div> +<div data-lang="Linux" markdown="1"> +```bash +pyspark --master local[*] --driver-class-path $SYSTEMML_HOME"/SystemML.jar" +``` +</div> +</div> + +## Matrix operations + +To get started with SystemML, let's try few elementary matrix multiplication operations: + +```python +import SystemML as sml +import numpy as np +sml.setSparkContext(sc) +m1 = sml.matrix(np.ones((3,3)) + 2) +m2 = sml.matrix(np.ones((3,3)) + 3) +m2 = m1 * (m2 + m1) +m4 = 1.0 - m2 +m4.sum(axis=1).toNumPyArray() +``` + +Output: + +```bash +array([[-60.], + [-60.], + [-60.]]) +``` + +Let us now write a simple script to train [linear regression](https://apache.github.io/incubator-systemml/algorithms-regression.html#linear-regression) +model: $ \beta = solve(X^T X, X^T y) $. For simplicity, we will use direct-solve method and ignore regularization parameter as well as intercept. + +```python +import numpy as np +from sklearn import datasets +import SystemML as sml +from pyspark.sql import SQLContext +# Load the diabetes dataset +diabetes = datasets.load_diabetes() +# Use only one feature +diabetes_X = diabetes.data[:, np.newaxis, 2] +# Split the data into training/testing sets +X_train = diabetes_X[:-20] +X_test = diabetes_X[-20:] +# Split the targets into training/testing sets +y_train = diabetes.target[:-20] +y_test = diabetes.target[-20:] +# Train Linear Regression model +sml.setSparkContext(sc) +X = sml.matrix(X_train) +y = sml.matrix(y_train) +A = X.transpose().dot(X) +b = X.transpose().dot(y) +beta = sml.solve(A, b).toNumPyArray() +y_predicted = X_test.dot(beta) +print('Residual sum of squares: %.2f' % np.mean((y_predicted - y_test) ** 2)) +``` + +Output: + +```bash +Residual sum of squares: 25282.12 +``` + +We can improve the residual error by adding an intercept and regularization parameter. To do so, we will use `mllearn` API described in the next section. + +## Invoke SystemML's algorithms + +SystemML also exposes a subpackage `mllearn`. This subpackage allows Python users to invoke SystemML algorithms +using Scikit-learn or MLPipeline API. + +### Scikit-learn interface + +In the below example, we invoke SystemML's [Linear Regression](https://apache.github.io/incubator-systemml/algorithms-regression.html#linear-regression) +algorithm. + +```python +import numpy as np +from sklearn import datasets +from SystemML.mllearn import LinearRegression +from pyspark.sql import SQLContext +# Load the diabetes dataset +diabetes = datasets.load_diabetes() +# Use only one feature +diabetes_X = diabetes.data[:, np.newaxis, 2] +# Split the data into training/testing sets +X_train = diabetes_X[:-20] +X_test = diabetes_X[-20:] +# Split the targets into training/testing sets +y_train = diabetes.target[:-20] +y_test = diabetes.target[-20:] +# Create linear regression object +regr = LinearRegression(sqlCtx, fit_intercept=True, C=1, solver='direct-solve') +# Train the model using the training sets +regr.fit(X_train, y_train) +y_predicted = regr.predict(X_test) +print('Residual sum of squares: %.2f' % np.mean((y_predicted - y_test) ** 2)) +``` + +Output: + +```bash +Residual sum of squares: 6991.17 +``` + +As expected, by adding intercept and regularizer the residual error drops significantly. + +Here is another example that where we invoke SystemML's [Logistic Regression](https://apache.github.io/incubator-systemml/algorithms-classification.html#multinomial-logistic-regression) +algorithm on digits datasets. + +```python +# Scikit-learn way +from sklearn import datasets, neighbors +from SystemML.mllearn import LogisticRegression +from pyspark.sql import SQLContext +sqlCtx = SQLContext(sc) +digits = datasets.load_digits() +X_digits = digits.data +y_digits = digits.target + 1 +n_samples = len(X_digits) +X_train = X_digits[:.9 * n_samples] +y_train = y_digits[:.9 * n_samples] +X_test = X_digits[.9 * n_samples:] +y_test = y_digits[.9 * n_samples:] +logistic = LogisticRegression(sqlCtx) +print('LogisticRegression score: %f' % logistic.fit(X_train, y_train).score(X_test, y_test)) +``` + +### Passing PySpark DataFrame + +To train the above algorithm on larger dataset, we can load the dataset into DataFrame and pass it to the `fit` method: + +```python +from sklearn import datasets, neighbors +from SystemML.mllearn import LogisticRegression +from pyspark.sql import SQLContext +import SystemML as sml +sqlCtx = SQLContext(sc) +digits = datasets.load_digits() +X_digits = digits.data +y_digits = digits.target + 1 +n_samples = len(X_digits) +# Split the data into training/testing sets and convert to PySpark DataFrame +df_train = sml.convertToLabeledDF(sqlContext, X_digits[:.9 * n_samples], y_digits[:.9 * n_samples]) +X_test = X_digits[.9 * n_samples:] +y_test = y_digits[.9 * n_samples:] +logistic = LogisticRegression(sqlCtx) +print('LogisticRegression score: %f' % logistic.fit(df_train).score(X_test, y_test)) +``` + +### MLPipeline interface + +In the below example, we demonstrate how the same `LogisticRegression` class can allow SystemML to fit seamlessly into +large data pipelines. + +```python +# MLPipeline way +from pyspark.ml import Pipeline +from SystemML.mllearn import LogisticRegression +from pyspark.ml.feature import HashingTF, Tokenizer +from pyspark.sql import SQLContext +sqlCtx = SQLContext(sc) +training = sqlCtx.createDataFrame([ + (0L, "a b c d e spark", 1.0), + (1L, "b d", 2.0), + (2L, "spark f g h", 1.0), + (3L, "hadoop mapreduce", 2.0), + (4L, "b spark who", 1.0), + (5L, "g d a y", 2.0), + (6L, "spark fly", 1.0), + (7L, "was mapreduce", 2.0), + (8L, "e spark program", 1.0), + (9L, "a e c l", 2.0), + (10L, "spark compile", 1.0), + (11L, "hadoop software", 2.0) +], ["id", "text", "label"]) +tokenizer = Tokenizer(inputCol="text", outputCol="words") +hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20) +lr = LogisticRegression(sqlCtx) +pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) +model = pipeline.fit(training) +test = sqlCtx.createDataFrame([ + (12L, "spark i j k"), + (13L, "l m n"), + (14L, "mapreduce spark"), + (15L, "apache hadoop")], ["id", "text"]) +prediction = model.transform(test) +prediction.show() +``` + +## Invoking DML/PyDML scripts using MLContext + +TODO: This is work in progress. + +```python +from sklearn import datasets, neighbors +from SystemML.mllearn import LogisticRegression +from pyspark.sql import DataFrame, SQLContext +import SystemML as sml +import pandas as pd +import os +sqlCtx = SQLContext(sc) +digits = datasets.load_digits() +X_digits = digits.data +y_digits = digits.target + 1 +n_samples = len(X_digits) +# Split the data into training/testing sets and convert to PySpark DataFrame +X_df = sqlCtx.createDataFrame(pd.DataFrame(X_digits[:.9 * n_samples])) +y_df = sqlCtx.createDataFrame(pd.DataFrame(y_digits[:.9 * n_samples])) +ml = sml.MLContext(sc) +script = os.path.join(os.environ['SYSTEMML_HOME'], 'scripts', 'algorithms', 'MultiLogReg.dml') +script = sml.dml(script).input(X=X_df, Y_vec=y_df).out("B_out") +# .input($X=' ', $Y=' ', $B=' ') +beta = ml.execute(script).getNumPyArray('B_out') +``` http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/docs/index.md ---------------------------------------------------------------------- diff --git a/docs/index.md b/docs/index.md index 738e525..3fcece6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -68,6 +68,8 @@ DML is a high-level R-like declarative language for machine learning. PyDML is a high-level Python-like declarative language for machine learning. * [Beginner's Guide to DML and PyDML](beginners-guide-to-dml-and-pydml) - An introduction to the basics of DML and PyDML. +* [Beginner's Guide for Python users](beginners-guide-python) - +Beginner's Guide for Python users. ## ML Algorithms http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java b/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java index dbc8f5d..605ba95 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java @@ -24,6 +24,8 @@ import java.util.Set; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.DataFrame; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.caching.CacheException; @@ -37,7 +39,10 @@ import org.apache.sysml.runtime.instructions.cp.DoubleObject; import org.apache.sysml.runtime.instructions.cp.IntObject; import org.apache.sysml.runtime.instructions.cp.ScalarObject; import org.apache.sysml.runtime.instructions.cp.StringObject; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.MatrixDimensionsMetaData; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.util.DataConverter; import scala.Tuple1; @@ -115,7 +120,21 @@ public class MLResults { */ public MatrixObject getMatrixObject(String outputName) { Data data = getData(outputName); - if (!(data instanceof MatrixObject)) { + if(data instanceof ScalarObject) { + double val = getDouble(outputName); + MatrixObject one_X_one_mo = new MatrixObject(ValueType.DOUBLE, " ", new MatrixDimensionsMetaData(new MatrixCharacteristics(1, 1, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE, 1))); + MatrixBlock mb = new MatrixBlock(1, 1, false); + mb.allocateDenseBlock(); + mb.setValue(0, 0, val); + try { + one_X_one_mo.acquireModify(mb); + one_X_one_mo.release(); + } catch (CacheException e) { + throw new RuntimeException(e); + } + return one_X_one_mo; + } + else if (!(data instanceof MatrixObject)) { throw new MLContextException("Variable '" + outputName + "' not a matrix"); } MatrixObject mo = (MatrixObject) data; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/java/org/apache/sysml/api/python/SystemML.py ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/python/SystemML.py b/src/main/java/org/apache/sysml/api/python/SystemML.py index 689403e..3b8ae96 100644 --- a/src/main/java/org/apache/sysml/api/python/SystemML.py +++ b/src/main/java/org/apache/sysml/api/python/SystemML.py @@ -27,18 +27,7 @@ import os from pyspark.context import SparkContext from pyspark.sql import DataFrame, SQLContext from pyspark.rdd import RDD -import numpy as np -import pandas as pd -import sklearn as sk -from sklearn import metrics -from pyspark.ml.feature import VectorAssembler -from pyspark.mllib.linalg import Vectors -import sys -from pyspark.ml import Estimator, Model -from scipy.sparse import spmatrix -from scipy.sparse import coo_matrix -SUPPORTED_TYPES = (np.ndarray, pd.DataFrame, spmatrix) class MLContext(object): @@ -244,10 +233,6 @@ class MLOutput(object): return df except Py4JJavaError: traceback.print_exc() - - def getPandasDF(self, sqlContext, varName): - df = self.toDF(sqlContext, varName).sort('ID').drop('ID') - return df.toPandas() def getMLMatrix(self, sqlContext, varName): raise Exception('Not supported in Python MLContext') @@ -265,218 +250,3 @@ class MLOutput(object): #except Py4JJavaError: # traceback.print_exc() -def getNumCols(numPyArr): - if numPyArr.ndim == 1: - return 1 - else: - return numPyArr.shape[1] - -def convertToMatrixBlock(sc, src): - if isinstance(src, spmatrix): - src = coo_matrix(src, dtype=np.float64) - numRows = src.shape[0] - numCols = src.shape[1] - data = src.data - row = src.row.astype(np.int32) - col = src.col.astype(np.int32) - nnz = len(src.col) - buf1 = bytearray(data.tostring()) - buf2 = bytearray(row.tostring()) - buf3 = bytearray(col.tostring()) - return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertSciPyCOOToMB(buf1, buf2, buf3, numRows, numCols, nnz) - elif isinstance(sc, SparkContext): - src = np.asarray(src) - numCols = getNumCols(src) - numRows = src.shape[0] - arr = src.ravel().astype(np.float64) - buf = bytearray(arr.tostring()) - return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertPy4JArrayToMB(buf, numRows, numCols) - else: - raise TypeError('sc needs to be of type SparkContext') # TODO: We can generalize this by creating py4j gateway ourselves - - -def convertToNumpyArr(sc, mb): - if isinstance(sc, SparkContext): - numRows = mb.getNumRows() - numCols = mb.getNumColumns() - buf = sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertMBtoPy4JDenseArr(mb) - return np.frombuffer(buf, count=numRows*numCols, dtype=np.float64) - else: - raise TypeError('sc needs to be of type SparkContext') # TODO: We can generalize this by creating py4j gateway ourselves - -def convertToPandasDF(X): - if not isinstance(X, pd.DataFrame): - return pd.DataFrame(X, columns=['C' + str(i) for i in range(getNumCols(X))]) - return X - -def tolist(inputCols): - return list(inputCols) - -def assemble(sqlCtx, pdf, inputCols, outputCol): - tmpDF = sqlCtx.createDataFrame(pdf, tolist(pdf.columns)) - assembler = VectorAssembler(inputCols=tolist(inputCols), outputCol=outputCol) - return assembler.transform(tmpDF) - -class mllearn: - class BaseSystemMLEstimator(Estimator): - # TODO: Allow users to set featuresCol (with default 'features') and labelCol (with default 'label') - - # Returns a model after calling fit(df) on Estimator object on JVM - def _fit(self, X): - if hasattr(X, '_jdf') and 'features' in X.columns and 'label' in X.columns: - self.model = self.estimator.fit(X._jdf) - return self - else: - raise Exception('Incorrect usage: Expected dataframe as input with features/label as columns') - - # Returns a model after calling fit(X:MatrixBlock, y:MatrixBlock) on Estimator object on JVM - def fit(self, X, y=None, params=None): - if y is None: - return self._fit(X) - elif y is not None and isinstance(X, SUPPORTED_TYPES): - if self.transferUsingDF: - pdfX = convertToPandasDF(X) - pdfY = convertToPandasDF(y) - if getNumCols(pdfY) != 1: - raise Exception('y should be a column vector') - if pdfX.shape[0] != pdfY.shape[0]: - raise Exception('Number of rows of X and y should match') - colNames = pdfX.columns - pdfX['label'] = pdfY[pdfY.columns[0]] - df = assemble(self.sqlCtx, pdfX, colNames, 'features').select('features', 'label') - self.model = self.estimator.fit(df._jdf) - else: - numColsy = getNumCols(y) - if numColsy != 1: - raise Exception('Expected y to be a column vector') - self.model = self.estimator.fit(convertToMatrixBlock(self.sc, X), convertToMatrixBlock(self.sc, y)) - if self.setOutputRawPredictionsToFalse: - self.model.setOutputRawPredictions(False) - return self - else: - raise Exception('Unsupported input type') - - def transform(self, X): - return self.predict(X) - - # Returns either a DataFrame or MatrixBlock after calling transform(X:MatrixBlock, y:MatrixBlock) on Model object on JVM - def predict(self, X): - if isinstance(X, SUPPORTED_TYPES): - if self.transferUsingDF: - pdfX = convertToPandasDF(X) - df = assemble(self.sqlCtx, pdfX, pdfX.columns, 'features').select('features') - retjDF = self.model.transform(df._jdf) - retDF = DataFrame(retjDF, self.sqlCtx) - retPDF = retDF.sort('ID').select('prediction').toPandas() - if isinstance(X, np.ndarray): - return retPDF.as_matrix().flatten() - else: - return retPDF - else: - retNumPy = convertToNumpyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X))) - if isinstance(X, np.ndarray): - return retNumPy - else: - return retNumPy # TODO: Convert to Pandas - elif hasattr(X, '_jdf'): - if 'features' in X.columns: - # No need to assemble as input DF is likely coming via MLPipeline - df = X - else: - assembler = VectorAssembler(inputCols=X.columns, outputCol='features') - df = assembler.transform(X) - retjDF = self.model.transform(df._jdf) - retDF = DataFrame(retjDF, self.sqlCtx) - # Return DF - return retDF.sort('ID') - else: - raise Exception('Unsupported input type') - - class BaseSystemMLClassifier(BaseSystemMLEstimator): - - # Scores the predicted value with ground truth 'y' - def score(self, X, y): - return metrics.accuracy_score(y, self.predict(X)) - - class BaseSystemMLRegressor(BaseSystemMLEstimator): - - # Scores the predicted value with ground truth 'y' - def score(self, X, y): - return metrics.r2_score(y, self.predict(X), multioutput='variance_weighted') - - - # Or we can create new Python project with package structure - class LogisticRegression(BaseSystemMLClassifier): - - # See https://apache.github.io/incubator-systemml/algorithms-reference for usage - def __init__(self, sqlCtx, penalty='l2', fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False): - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc - self.uid = "logReg" - self.estimator = self.sc._jvm.org.apache.sysml.api.ml.LogisticRegression(self.uid, self.sc._jsc.sc()) - self.estimator.setMaxOuterIter(max_iter) - self.estimator.setMaxInnerIter(max_inner_iter) - if C <= 0: - raise Exception('C has to be positive') - reg = 1.0 / C - self.estimator.setRegParam(reg) - self.estimator.setTol(tol) - self.estimator.setIcpt(int(fit_intercept)) - self.transferUsingDF = transferUsingDF - self.setOutputRawPredictionsToFalse = True - if penalty != 'l2': - raise Exception('Only l2 penalty is supported') - if solver != 'newton-cg': - raise Exception('Only newton-cg solver supported') - - class LinearRegression(BaseSystemMLRegressor): - - # See https://apache.github.io/incubator-systemml/algorithms-reference for usage - def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False): - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc - self.uid = "lr" - if solver == 'newton-cg' or solver == 'direct-solve': - self.estimator = self.sc._jvm.org.apache.sysml.api.ml.LinearRegression(self.uid, self.sc._jsc.sc(), solver) - else: - raise Exception('Only newton-cg solver supported') - self.estimator.setMaxIter(max_iter) - if C <= 0: - raise Exception('C has to be positive') - reg = 1.0 / C - self.estimator.setRegParam(reg) - self.estimator.setTol(tol) - self.estimator.setIcpt(int(fit_intercept)) - self.transferUsingDF = transferUsingDF - self.setOutputRawPredictionsToFalse = False - - - class SVM(BaseSystemMLClassifier): - - # See https://apache.github.io/incubator-systemml/algorithms-reference for usage - def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False, transferUsingDF=False): - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc - self.uid = "svm" - self.estimator = self.sc._jvm.org.apache.sysml.api.ml.SVM(self.uid, self.sc._jsc.sc(), is_multi_class) - self.estimator.setMaxIter(max_iter) - if C <= 0: - raise Exception('C has to be positive') - reg = 1.0 / C - self.estimator.setRegParam(reg) - self.estimator.setTol(tol) - self.estimator.setIcpt(int(fit_intercept)) - self.transferUsingDF = transferUsingDF - self.setOutputRawPredictionsToFalse = False - - class NaiveBayes(BaseSystemMLClassifier): - - # See https://apache.github.io/incubator-systemml/algorithms-reference for usage - def __init__(self, sqlCtx, laplace=1.0, transferUsingDF=False): - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc - self.uid = "nb" - self.estimator = self.sc._jvm.org.apache.sysml.api.ml.NaiveBayes(self.uid, self.sc._jsc.sc()) - self.estimator.setLaplace(laplace) - self.transferUsingDF = transferUsingDF - self.setOutputRawPredictionsToFalse = False \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/java/org/apache/sysml/api/python/test.py ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/python/test.py b/src/main/java/org/apache/sysml/api/python/test.py deleted file mode 100644 index 21a1f79..0000000 --- a/src/main/java/org/apache/sysml/api/python/test.py +++ /dev/null @@ -1,178 +0,0 @@ -#!/usr/bin/python -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- -from sklearn import datasets, neighbors -import SystemML as sml -from pyspark.sql import SQLContext -from pyspark.context import SparkContext -import unittest -from pyspark.ml.evaluation import MulticlassClassificationEvaluator -from pyspark.ml import Pipeline -from pyspark.ml.feature import HashingTF, Tokenizer -import numpy as np -from sklearn.datasets import fetch_20newsgroups -from sklearn.feature_extraction.text import TfidfVectorizer -from sklearn import metrics - -sc = SparkContext() -sqlCtx = SQLContext(sc) - -# Currently not integrated with JUnit test -# ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] --driver-class-path SystemML.jar test.py -class TestMLLearn(unittest.TestCase): - def testLogisticSK1(self): - digits = datasets.load_digits() - X_digits = digits.data - y_digits = digits.target - n_samples = len(X_digits) - X_train = X_digits[:.9 * n_samples] - y_train = y_digits[:.9 * n_samples] - X_test = X_digits[.9 * n_samples:] - y_test = y_digits[.9 * n_samples:] - logistic = sml.mllearn.LogisticRegression(sqlCtx) - score = logistic.fit(X_train, y_train).score(X_test, y_test) - self.failUnless(score > 0.9) - - def testLogisticSK2(self): - digits = datasets.load_digits() - X_digits = digits.data - y_digits = digits.target - n_samples = len(X_digits) - X_train = X_digits[:.9 * n_samples] - y_train = y_digits[:.9 * n_samples] - X_test = X_digits[.9 * n_samples:] - y_test = y_digits[.9 * n_samples:] - # Convert to DataFrame for i/o: current way to transfer data - logistic = sml.mllearn.LogisticRegression(sqlCtx, transferUsingDF=True) - score = logistic.fit(X_train, y_train).score(X_test, y_test) - self.failUnless(score > 0.9) - - def testLogisticMLPipeline1(self): - training = sqlCtx.createDataFrame([ - (0L, "a b c d e spark", 1.0), - (1L, "b d", 2.0), - (2L, "spark f g h", 1.0), - (3L, "hadoop mapreduce", 2.0), - (4L, "b spark who", 1.0), - (5L, "g d a y", 2.0), - (6L, "spark fly", 1.0), - (7L, "was mapreduce", 2.0), - (8L, "e spark program", 1.0), - (9L, "a e c l", 2.0), - (10L, "spark compile", 1.0), - (11L, "hadoop software", 2.0) - ], ["id", "text", "label"]) - tokenizer = Tokenizer(inputCol="text", outputCol="words") - hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20) - lr = sml.mllearn.LogisticRegression(sqlCtx) - pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) - model = pipeline.fit(training) - test = sqlCtx.createDataFrame([ - (12L, "spark i j k", 1.0), - (13L, "l m n", 2.0), - (14L, "mapreduce spark", 1.0), - (15L, "apache hadoop", 2.0)], ["id", "text", "label"]) - result = model.transform(test) - predictionAndLabels = result.select("prediction", "label") - evaluator = MulticlassClassificationEvaluator() - score = evaluator.evaluate(predictionAndLabels) - self.failUnless(score == 1.0) - - def testLinearRegressionSK1(self): - diabetes = datasets.load_diabetes() - diabetes_X = diabetes.data[:, np.newaxis, 2] - diabetes_X_train = diabetes_X[:-20] - diabetes_X_test = diabetes_X[-20:] - diabetes_y_train = diabetes.target[:-20] - diabetes_y_test = diabetes.target[-20:] - regr = sml.mllearn.LinearRegression(sqlCtx) - regr.fit(diabetes_X_train, diabetes_y_train) - score = regr.score(diabetes_X_test, diabetes_y_test) - self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am using it incorrectly) - - def testLinearRegressionSK2(self): - diabetes = datasets.load_diabetes() - diabetes_X = diabetes.data[:, np.newaxis, 2] - diabetes_X_train = diabetes_X[:-20] - diabetes_X_test = diabetes_X[-20:] - diabetes_y_train = diabetes.target[:-20] - diabetes_y_test = diabetes.target[-20:] - regr = sml.mllearn.LinearRegression(sqlCtx, transferUsingDF=True) - regr.fit(diabetes_X_train, diabetes_y_train) - score = regr.score(diabetes_X_test, diabetes_y_test) - self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am using it incorrectly) - - def testSVMSK1(self): - digits = datasets.load_digits() - X_digits = digits.data - y_digits = digits.target - n_samples = len(X_digits) - X_train = X_digits[:.9 * n_samples] - y_train = y_digits[:.9 * n_samples] - X_test = X_digits[.9 * n_samples:] - y_test = y_digits[.9 * n_samples:] - svm = sml.mllearn.SVM(sqlCtx, is_multi_class=True) - score = svm.fit(X_train, y_train).score(X_test, y_test) - self.failUnless(score > 0.9) - - def testSVMSK2(self): - digits = datasets.load_digits() - X_digits = digits.data - y_digits = digits.target - n_samples = len(X_digits) - X_train = X_digits[:.9 * n_samples] - y_train = y_digits[:.9 * n_samples] - X_test = X_digits[.9 * n_samples:] - y_test = y_digits[.9 * n_samples:] - svm = sml.mllearn.SVM(sqlCtx, is_multi_class=True, transferUsingDF=True) - score = svm.fit(X_train, y_train).score(X_test, y_test) - self.failUnless(score > 0.9) - - def testNaiveBayesSK1(self): - digits = datasets.load_digits() - X_digits = digits.data - y_digits = digits.target - n_samples = len(X_digits) - X_train = X_digits[:.9 * n_samples] - y_train = y_digits[:.9 * n_samples] - X_test = X_digits[.9 * n_samples:] - y_test = y_digits[.9 * n_samples:] - nb = sml.mllearn.NaiveBayes(sqlCtx) - score = nb.fit(X_train, y_train).score(X_test, y_test) - self.failUnless(score > 0.85) - - def testNaiveBayesSK2(self): - categories = ['alt.atheism', 'talk.religion.misc', 'comp.graphics', 'sci.space'] - newsgroups_train = fetch_20newsgroups(subset='train', categories=categories) - newsgroups_test = fetch_20newsgroups(subset='test', categories=categories) - vectorizer = TfidfVectorizer() - # Both vectors and vectors_test are SciPy CSR matrix - vectors = vectorizer.fit_transform(newsgroups_train.data) - vectors_test = vectorizer.transform(newsgroups_test.data) - nb = sml.mllearn.NaiveBayes(sqlCtx) - nb.fit(vectors, newsgroups_train.target) - pred = nb.predict(vectors_test) - score = metrics.f1_score(newsgroups_test.target, pred, average='weighted') - self.failUnless(score > 0.8) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/MANIFEST.in ---------------------------------------------------------------------- diff --git a/src/main/python/MANIFEST.in b/src/main/python/MANIFEST.in new file mode 100644 index 0000000..a185263 --- /dev/null +++ b/src/main/python/MANIFEST.in @@ -0,0 +1,29 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- +include SystemML/SystemML-java/LICENSE +include SystemML/SystemML-java/SystemML-config.xml +include SystemML/SystemML-java/NOTICE +include SystemML/SystemML-java/SystemML.jar +include SystemML/SystemML-java/DISCLAIMER +include SystemML/SystemML-java/scripts/sparkDML.sh +recursive-include SystemML/SystemML-java/scripts/algorithms * +recursive-include SystemML/SystemML-java/scripts/datagen * +recursive-include SystemML/SystemML-java/scripts/utils * \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/SystemML.py ---------------------------------------------------------------------- diff --git a/src/main/python/SystemML.py b/src/main/python/SystemML.py deleted file mode 100644 index 7142a9d..0000000 --- a/src/main/python/SystemML.py +++ /dev/null @@ -1,258 +0,0 @@ -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- -import os - -from py4j.java_gateway import JavaObject -from pyspark import SparkContext -import pyspark.mllib.common - - -def dml(scriptString): - """ - Create a dml script object based on a string. - - Parameters - ---------- - scriptString: string - Can be a path to a dml script or a dml script itself. - - Returns - ------- - script: Script instance - Instance of a script object. - """ - if not isinstance(scriptString, str): - raise ValueError("scriptString should be a string, got %s" % type(scriptString)) - return Script(scriptString, scriptType="dml") - - -def pydml(scriptString): - """ - Create a pydml script object based on a string. - - Parameters - ---------- - scriptString: string - Can be a path to a pydml script or a pydml script itself. - - Returns - ------- - script: Script instance - Instance of a script object. - """ - if not isinstance(scriptString, str): - raise ValueError("scriptString should be a string, got %s" % type(scriptString)) - return Script(scriptString, scriptType="pydml") - - -def _java2py(sc, obj): - """ Convert Java object to Python. """ - # TODO: Port this private PySpark function. - obj = pyspark.mllib.common._java2py(sc, obj) - if isinstance(obj, JavaObject): - class_name = obj.getClass().getSimpleName() - if class_name == 'Matrix': - obj = Matrix(obj, sc) - return obj - - -def _py2java(sc, obj): - """ Convert Python object to Java. """ - if isinstance(obj, Matrix): - obj = obj._java_matrix - # TODO: Port this private PySpark function. - obj = pyspark.mllib.common._py2java(sc, obj) - return obj - - -class Matrix(object): - """ - Wrapper around a Java Matrix object. - - Parameters - ---------- - javaMatrix: JavaObject - A Java Matrix object as returned by calling `ml.execute().get()`. - - sc: SparkContext - SparkContext - """ - def __init__(self, javaMatrix, sc): - self._java_matrix = javaMatrix - self.sc = sc - - def __repr__(self): - return "Matrix" - - def toDF(self): - """ - Convert the Matrix to a PySpark SQL DataFrame. - - Returns - ------- - df: PySpark SQL DataFrame - A PySpark SQL DataFrame representing the matrix, with - one "ID" column containing the row index (since Spark - DataFrames are unordered), followed by columns of doubles - for each column in the matrix. - """ - jdf = self._java_matrix.asDataFrame() - df = _java2py(self.sc, jdf) - return df - - -class MLResults(object): - """ - Wrapper around a Java ML Results object. - - Parameters - ---------- - results: JavaObject - A Java MLResults object as returned by calling `ml.execute()`. - - sc: SparkContext - SparkContext - """ - def __init__(self, results, sc): - self._java_results = results - self.sc = sc - - def __repr__(self): - return "MLResults" - - def get(self, *outputs): - """ - Parameters - ---------- - outputs: string, list of strings - Output variables as defined inside the DML script. - """ - outs = [_java2py(self.sc, self._java_results.get(out)) for out in outputs] - if len(outs) == 1: - return outs[0] - return outs - - -class Script(object): - """ - Instance of a DML/PyDML Script. - - Parameters - ---------- - scriptString: string - Can be either a file path to a DML script or a DML script itself. - - scriptType: string - Script language, either "dml" for DML (R-like) or "pydml" for PyDML (Python-like). - """ - def __init__(self, scriptString, scriptType="dml"): - self.scriptString = scriptString - self.scriptType = scriptType - self._input = {} - self._output = [] - - def input(self, *args, **kwargs): - """ - Parameters - ---------- - args: name, value tuple - where name is a string, and currently supported value formats - are double, string, dataframe, rdd, and list of such object. - - kwargs: dict of name, value pairs - To know what formats are supported for name and value, look above. - """ - if args and len(args) != 2: - raise ValueError("Expected name, value pair.") - elif args: - self._input[args[0]] = args[1] - for name, value in kwargs.items(): - self._input[name] = value - return self - - def out(self, *names): - """ - Parameters - ---------- - names: string, list of strings - Output variables as defined inside the DML script. - """ - self._output.extend(names) - return self - - -class MLContext(object): - """ - Wrapper around the new SystemML MLContext. - - Parameters - ---------- - sc: SparkContext - SparkContext - """ - def __init__(self, sc): - if not isinstance(sc, SparkContext): - raise ValueError("Expected sc to be a SparkContext, got " % sc) - self._sc = sc - self._ml = sc._jvm.org.apache.sysml.api.mlcontext.MLContext(sc._jsc) - - def __repr__(self): - return "MLContext" - - def execute(self, script): - """ - Execute a DML / PyDML script. - - Parameters - ---------- - script: Script instance - Script instance defined with the appropriate input and output variables. - - Returns - ------- - ml_results: MLResults - MLResults instance. - """ - if not isinstance(script, Script): - raise ValueError("Expected script to be an instance of Script") - scriptString = script.scriptString - if script.scriptType == "dml": - if scriptString.endswith(".dml"): - if os.path.exists(scriptString): - script_java = self._sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.dmlFromFile(scriptString) - else: - raise ValueError("path: %s does not exist" % scriptString) - else: - script_java = self._sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.dml(scriptString) - elif script.scriptType == "pydml": - if scriptString.endswith(".pydml"): - if os.path.exists(scriptString): - script_java = self._sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.pydmlFromFile(scriptString) - else: - raise ValueError("path: %s does not exist" % scriptString) - else: - script_java = self._sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.pydml(scriptString) - - for key, val in script._input.items(): - script_java.input(key, _py2java(self._sc, val)) - for val in script._output: - script_java.out(val) - return MLResults(self._ml.execute(script_java), self._sc) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/SystemML/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/SystemML/__init__.py b/src/main/python/SystemML/__init__.py new file mode 100644 index 0000000..02a940b --- /dev/null +++ b/src/main/python/SystemML/__init__.py @@ -0,0 +1,29 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from .mlcontext import * +from .defmatrix import * +from .converters import * + +__all__ = mlcontext.__all__ +__all__ += defmatrix.__all__ +__all__ += converters.__all__ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/SystemML/converters.py ---------------------------------------------------------------------- diff --git a/src/main/python/SystemML/converters.py b/src/main/python/SystemML/converters.py new file mode 100644 index 0000000..9588bec --- /dev/null +++ b/src/main/python/SystemML/converters.py @@ -0,0 +1,100 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from pyspark.context import SparkContext +from pyspark.sql import DataFrame, SQLContext +from pyspark.rdd import RDD +import numpy as np +import pandas as pd +import sklearn as sk + +from scipy.sparse import spmatrix +from scipy.sparse import coo_matrix + +SUPPORTED_TYPES = (np.ndarray, pd.DataFrame, spmatrix) + +def getNumCols(numPyArr): + if numPyArr.ndim == 1: + return 1 + else: + return numPyArr.shape[1] + +def convertToLabeledDF(sqlCtx, X, y=None): + from pyspark.ml.feature import VectorAssembler + if y is not None: + pd1 = pd.DataFrame(X) + pd2 = pd.DataFrame(y, columns=['label']) + pdf = pd.concat([pd1, pd2], axis=1) + inputColumns = ['C' + str(i) for i in pd1.columns] + outputColumns = inputColumns + ['label'] + else: + pdf = pd.DataFrame(X) + inputColumns = ['C' + str(i) for i in pdf.columns] + outputColumns = inputColumns + assembler = VectorAssembler(inputCols=inputColumns, outputCol='features') + out = assembler.transform(sqlCtx.createDataFrame(pdf, outputColumns)) + if y is not None: + return out.select('features', 'label') + else: + return out.select('features') + + +def convertToMatrixBlock(sc, src): + if isinstance(src, spmatrix): + src = coo_matrix(src, dtype=np.float64) + numRows = src.shape[0] + numCols = src.shape[1] + data = src.data + row = src.row.astype(np.int32) + col = src.col.astype(np.int32) + nnz = len(src.col) + buf1 = bytearray(data.tostring()) + buf2 = bytearray(row.tostring()) + buf3 = bytearray(col.tostring()) + return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertSciPyCOOToMB(buf1, buf2, buf3, numRows, numCols, nnz) + elif isinstance(sc, SparkContext): + src = np.asarray(src) + numCols = getNumCols(src) + numRows = src.shape[0] + arr = src.ravel().astype(np.float64) + buf = bytearray(arr.tostring()) + return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertPy4JArrayToMB(buf, numRows, numCols) + else: + raise TypeError('sc needs to be of type SparkContext') # TODO: We can generalize this by creating py4j gateway ourselves + + +def convertToNumpyArr(sc, mb): + if isinstance(sc, SparkContext): + numRows = mb.getNumRows() + numCols = mb.getNumColumns() + buf = sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertMBtoPy4JDenseArr(mb) + return np.frombuffer(buf, count=numRows*numCols, dtype=np.float64).reshape((numRows, numCols)) + else: + raise TypeError('sc needs to be of type SparkContext') # TODO: We can generalize this by creating py4j gateway ourselves + + +def convertToPandasDF(X): + if not isinstance(X, pd.DataFrame): + return pd.DataFrame(X, columns=['C' + str(i) for i in range(getNumCols(X))]) + return X + +__all__ = [ 'getNumCols', 'convertToMatrixBlock', 'convertToNumpyArr', 'convertToPandasDF', 'SUPPORTED_TYPES' , 'convertToLabeledDF'] http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/SystemML/defmatrix.py ---------------------------------------------------------------------- diff --git a/src/main/python/SystemML/defmatrix.py b/src/main/python/SystemML/defmatrix.py new file mode 100644 index 0000000..7e2c453 --- /dev/null +++ b/src/main/python/SystemML/defmatrix.py @@ -0,0 +1,295 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import numpy as np + +from . import pydml, MLContext +from .converters import * +from pyspark import SparkContext, RDD +from pyspark.sql import DataFrame, SQLContext + +def setSparkContext(sc): + """ + Before using the matrix, the user needs to invoke this function. + + Parameters + ---------- + sc: SparkContext + SparkContext + """ + matrix.ml = MLContext(sc) + matrix.sc = sc + +def checkIfMLContextIsSet(): + if matrix.ml is None: + raise Exception('Expected setSparkContext(sc) to be called.') + +class DMLOp(object): + def __init__(self, inputs, dml=None): + self.inputs = inputs + self.dml = dml + + def _visit(self, execute=True): + matrix.dml = matrix.dml + self.dml + + +def reset(): + for m in matrix.visited: + m.visited = False + matrix.visited = [] + +def binaryOp(lhs, rhs, opStr): + inputs = [] + if isinstance(lhs, matrix): + lhsStr = lhs.ID + inputs = [lhs] + elif isinstance(lhs, float) or isinstance(lhs, int): + lhsStr = str(lhs) + else: + raise TypeError('Incorrect type') + if isinstance(rhs, matrix): + rhsStr = rhs.ID + inputs = inputs + [rhs] + elif isinstance(rhs, float) or isinstance(rhs, int): + rhsStr = str(rhs) + else: + raise TypeError('Incorrect type') + dmlOp = DMLOp(inputs) + out = matrix(None, op=dmlOp) + dmlOp.dml = [out.ID, ' = ', lhsStr, opStr, rhsStr, '\n'] + return out + +def binaryMatrixFunction(X, Y, fnName): + if not isinstance(X, matrix) or not isinstance(Y, matrix): + raise TypeError('Incorrect input type. Expected matrix type') + inputs = [X, Y] + dmlOp = DMLOp(inputs) + out = matrix(None, op=dmlOp) + dmlOp.dml = [out.ID, ' = ', fnName,'(', X.ID, ', ', Y.ID, ')\n'] + return out + +def solve(A, b): + return binaryMatrixFunction(A, b, 'solve') + + +def eval(outputs, outputDF=False, execute=True): + """ + Executes the unevaluated DML script and computes the matrices specified by outputs. + + Parameters + ---------- + outputs: list of matrices + outputDF: back the data of matrix as PySpark DataFrame + """ + checkIfMLContextIsSet() + reset() + matrix.dml = [] + matrix.script = pydml('') + if isinstance(outputs, matrix): + outputs = [ outputs ] + elif not isinstance(outputs, list): + raise TypeError('Incorrect input type') + for m in outputs: + m.output = True + m._visit(execute=execute) + if not execute: + return ''.join(matrix.dml) + matrix.script.scriptString = ''.join(matrix.dml) + results = matrix.ml.execute(matrix.script) + for m in outputs: + if outputDF: + m.data = results.getDataFrame(m.ID) + else: + m.data = results.getNumPyArray(m.ID) + +# Instead of inheriting from np.matrix +class matrix(object): + systemmlVarID = 0 + dml = [] + script = None + ml = None + visited = [] + def __init__(self, data, op=None): + """ + Constructs a lazy matrix + + Parameters + ---------- + data: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame. (data cannot be None for external users, 'data=None' is used internally for lazy evaluation). + """ + checkIfMLContextIsSet() + self.visited = False + matrix.systemmlVarID += 1 + self.output = False + self.ID = 'mVar' + str(matrix.systemmlVarID) + if isinstance(data, SUPPORTED_TYPES): + self.data = data + elif hasattr(data, '_jdf'): + self.data = data + elif data is None and op is not None: + self.data = None + # op refers to the node of Abstract Syntax Tree created internally for lazy evaluation + self.op = op + else: + raise TypeError('Unsupported input type') + + def eval(self, outputDF=False): + eval([self], outputDF=False) + + def toPandas(self): + if self.data is None: + self.eval() + return convertToPandasDF(self.data) + + def toNumPyArray(self): + if self.data is None: + self.eval() + if isinstance(self.data, DataFrame): + self.data = self.data.toPandas().as_matrix() + # Always keep default format as NumPy array if possible + return self.data + + def toDataFrame(self): + if self.data is None: + self.eval(outputDF=True) + if not isinstance(self.data, DataFrame): + if MLResults.sqlContext is None: + MLResults.sqlContext = SQLContext(matrix.sc) + self.data = sqlContext.createDataFrame(self.toPandas()) + return self.data + + def _visit(self, execute=True): + if self.visited: + return self + self.visited = True + # for cleanup + matrix.visited = matrix.visited + [ self ] + if self.data is not None: + matrix.dml = matrix.dml + [ self.ID, ' = load(\" \", format=\"csv\")\n'] + if isinstance(self.data, DataFrame) and execute: + matrix.script.input(self.ID, self.data) + elif execute: + matrix.script.input(self.ID, convertToMatrixBlock(matrix.sc, self.data)) + return self + elif self.op is not None: + for m in self.op.inputs: + m._visit(execute=execute) + self.op._visit(execute=execute) + else: + raise Exception('Expected either op or data to be set') + if self.data is None and self.output: + matrix.dml = matrix.dml + ['save(', self.ID, ', \" \")\n'] + if execute: + matrix.script.out(self.ID) + return self + + def __repr__(self): + if self.data is None: + print('# This matrix (' + self.ID + ') is backed by below given PyDML script (which is not yet evaluated). To fetch the data of this matrix, invoke toNumPyArray() or toDataFrame() or toPandas() methods.\n' + eval([self], execute=False)) + elif isinstance(self.data, DataFrame): + print('# This matrix (' + self.ID + ') is backed by PySpark DataFrame. To fetch the DataFrame, invoke toDataFrame() method.') + else: + print('# This matrix (' + self.ID + ') is backed by NumPy array. To fetch the NumPy array, invoke toNumPyArray() method.') + return '<SystemML.defmatrix.matrix object>' + + def __add__(self, other): + return binaryOp(self, other, ' + ') + + def __sub__(self, other): + return binaryOp(self, other, ' - ') + + def __mul__(self, other): + return binaryOp(self, other, ' * ') + + def __floordiv__(self, other): + return binaryOp(self, other, ' // ') + + def __div__(self, other): + return binaryOp(self, other, ' / ') + + def __mod__(self, other): + return binaryOp(self, other, ' % ') + + def __pow__(self, other): + return binaryOp(self, other, ' ** ') + + def __radd__(self, other): + return binaryOp(other, self, ' + ') + + def __rsub__(self, other): + return binaryOp(other, self, ' - ') + + def __rmul__(self, other): + return binaryOp(other, self, ' * ') + + def __rfloordiv__(self, other): + return binaryOp(other, self, ' // ') + + def __rdiv__(self, other): + return binaryOp(other, self, ' / ') + + def __rmod__(self, other): + return binaryOp(other, self, ' % ') + + def __rpow__(self, other): + return binaryOp(other, self, ' ** ') + + def sum(self, axis=None): + return self._aggFn('sum', axis) + + def mean(self, axis=None): + return self._aggFn('mean', axis) + + def max(self, axis=None): + return self._aggFn('max', axis) + + def min(self, axis=None): + return self._aggFn('min', axis) + + def argmin(self, axis=None): + return self._aggFn('argmin', axis) + + def argmax(self, axis=None): + return self._aggFn('argmax', axis) + + def cumsum(self, axis=None): + return self._aggFn('cumsum', axis) + + def transpose(self, axis=None): + return self._aggFn('transpose', axis) + + def trace(self, axis=None): + return self._aggFn('trace', axis) + + def _aggFn(self, fnName, axis): + dmlOp = DMLOp([self]) + out = matrix(None, op=dmlOp) + if axis is None: + dmlOp.dml = [out.ID, ' = ', fnName, '(', self.ID, ')\n'] + else: + dmlOp.dml = [out.ID, ' = ', fnName, '(', self.ID, ', axis=', str(axis) ,')\n'] + return out + + def dot(self, other): + return binaryMatrixFunction(self, other, 'dot') + +__all__ = [ 'setSparkContext', 'matrix', 'eval', 'solve'] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/SystemML/mlcontext.py ---------------------------------------------------------------------- diff --git a/src/main/python/SystemML/mlcontext.py b/src/main/python/SystemML/mlcontext.py new file mode 100644 index 0000000..7ed277a --- /dev/null +++ b/src/main/python/SystemML/mlcontext.py @@ -0,0 +1,296 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- +import os + +try: + from py4j.java_gateway import JavaObject +except ImportError: + raise ImportError('Unable to import JavaObject from py4j.java_gateway. Hint: Make sure you are running with pyspark') + +from pyspark import SparkContext +import pyspark.mllib.common +from pyspark.sql import DataFrame, SQLContext +from .converters import * + +def dml(scriptString): + """ + Create a dml script object based on a string. + + Parameters + ---------- + scriptString: string + Can be a path to a dml script or a dml script itself. + + Returns + ------- + script: Script instance + Instance of a script object. + """ + if not isinstance(scriptString, str): + raise ValueError("scriptString should be a string, got %s" % type(scriptString)) + return Script(scriptString, scriptType="dml") + + +def pydml(scriptString): + """ + Create a pydml script object based on a string. + + Parameters + ---------- + scriptString: string + Can be a path to a pydml script or a pydml script itself. + + Returns + ------- + script: Script instance + Instance of a script object. + """ + if not isinstance(scriptString, str): + raise ValueError("scriptString should be a string, got %s" % type(scriptString)) + return Script(scriptString, scriptType="pydml") + + +def _java2py(sc, obj): + """ Convert Java object to Python. """ + # TODO: Port this private PySpark function. + obj = pyspark.mllib.common._java2py(sc, obj) + if isinstance(obj, JavaObject): + class_name = obj.getClass().getSimpleName() + if class_name == 'Matrix': + obj = Matrix(obj, sc) + return obj + + +def _py2java(sc, obj): + """ Convert Python object to Java. """ + if isinstance(obj, Matrix): + obj = obj._java_matrix + # TODO: Port this private PySpark function. + obj = pyspark.mllib.common._py2java(sc, obj) + return obj + + +class Matrix(object): + """ + Wrapper around a Java Matrix object. + + Parameters + ---------- + javaMatrix: JavaObject + A Java Matrix object as returned by calling `ml.execute().get()`. + + sc: SparkContext + SparkContext + """ + def __init__(self, javaMatrix, sc): + self._java_matrix = javaMatrix + self.sc = sc + + def __repr__(self): + return "Matrix" + + def toDF(self): + """ + Convert the Matrix to a PySpark SQL DataFrame. + + Returns + ------- + df: PySpark SQL DataFrame + A PySpark SQL DataFrame representing the matrix, with + one "ID" column containing the row index (since Spark + DataFrames are unordered), followed by columns of doubles + for each column in the matrix. + """ + jdf = self._java_matrix.asDataFrame() + df = _java2py(self.sc, jdf) + return df + + +class MLResults(object): + """ + Wrapper around a Java ML Results object. + + Parameters + ---------- + results: JavaObject + A Java MLResults object as returned by calling `ml.execute()`. + + sc: SparkContext + SparkContext + """ + def __init__(self, results, sc): + self._java_results = results + self.sc = sc + try: + if MLResults.sqlContext is None: + MLResults.sqlContext = SQLContext(sc) + except AttributeError: + MLResults.sqlContext = SQLContext(sc) + + def __repr__(self): + return "MLResults" + + def getNumPyArray(self, *outputs): + """ + Parameters + ---------- + outputs: string, list of strings + Output variables as defined inside the DML script. + """ + outs = [convertToNumpyArr(self.sc, self._java_results.getMatrix(out).asBinaryBlockMatrix().getMatrixBlock()) for out in outputs] + if len(outs) == 1: + return outs[0] + return outs + + def getDataFrame(self, *outputs): + """ + Parameters + ---------- + outputs: string, list of strings + Output variables as defined inside the DML script. + """ + outs = [DataFrame(self._java_results.getDataFrame(out), MLResults.sqlContext) for out in outputs] + if len(outs) == 1: + return outs[0] + return outs + + def get(self, *outputs): + """ + Parameters + ---------- + outputs: string, list of strings + Output variables as defined inside the DML script. + """ + outs = [_java2py(self.sc, self._java_results.get(out)) for out in outputs] + if len(outs) == 1: + return outs[0] + return outs + + +class Script(object): + """ + Instance of a DML/PyDML Script. + + Parameters + ---------- + scriptString: string + Can be either a file path to a DML script or a DML script itself. + + scriptType: string + Script language, either "dml" for DML (R-like) or "pydml" for PyDML (Python-like). + """ + def __init__(self, scriptString, scriptType="dml"): + self.scriptString = scriptString + self.scriptType = scriptType + self._input = {} + self._output = [] + + def input(self, *args, **kwargs): + """ + Parameters + ---------- + args: name, value tuple + where name is a string, and currently supported value formats + are double, string, dataframe, rdd, and list of such object. + + kwargs: dict of name, value pairs + To know what formats are supported for name and value, look above. + """ + if args and len(args) != 2: + raise ValueError("Expected name, value pair.") + elif args: + self._input[args[0]] = args[1] + for name, value in kwargs.items(): + self._input[name] = value + return self + + def out(self, *names): + """ + Parameters + ---------- + names: string, list of strings + Output variables as defined inside the DML script. + """ + self._output.extend(names) + return self + + +class MLContext(object): + """ + Wrapper around the new SystemML MLContext. + + Parameters + ---------- + sc: SparkContext + SparkContext + """ + def __init__(self, sc): + if not isinstance(sc, SparkContext): + raise ValueError("Expected sc to be a SparkContext, got " % sc) + self._sc = sc + self._ml = sc._jvm.org.apache.sysml.api.mlcontext.MLContext(sc._jsc) + + def __repr__(self): + return "MLContext" + + def execute(self, script): + """ + Execute a DML / PyDML script. + + Parameters + ---------- + script: Script instance + Script instance defined with the appropriate input and output variables. + + Returns + ------- + ml_results: MLResults + MLResults instance. + """ + if not isinstance(script, Script): + raise ValueError("Expected script to be an instance of Script") + scriptString = script.scriptString + if script.scriptType == "dml": + if scriptString.endswith(".dml"): + if os.path.exists(scriptString): + script_java = self._sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.dmlFromFile(scriptString) + else: + raise ValueError("path: %s does not exist" % scriptString) + else: + script_java = self._sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.dml(scriptString) + elif script.scriptType == "pydml": + if scriptString.endswith(".pydml"): + if os.path.exists(scriptString): + script_java = self._sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.pydmlFromFile(scriptString) + else: + raise ValueError("path: %s does not exist" % scriptString) + else: + script_java = self._sc._jvm.org.apache.sysml.api.mlcontext.ScriptFactory.pydml(scriptString) + + for key, val in script._input.items(): + script_java.input(key, _py2java(self._sc, val)) + for val in script._output: + script_java.out(val) + return MLResults(self._ml.execute(script_java), self._sc) + + +__all__ = ['MLResults', 'MLContext', 'Script', 'dml', 'pydml'] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a7267f/src/main/python/SystemML/mllearn/__init__.py ---------------------------------------------------------------------- diff --git a/src/main/python/SystemML/mllearn/__init__.py b/src/main/python/SystemML/mllearn/__init__.py new file mode 100644 index 0000000..69cab58 --- /dev/null +++ b/src/main/python/SystemML/mllearn/__init__.py @@ -0,0 +1,25 @@ +#!/usr/bin/python +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from .estimators import * + +__all__ = estimators.__all__ \ No newline at end of file
