Repository: systemml Updated Branches: refs/heads/master 54a11eed3 -> 8ffa3d158
[SYSTEMML-445] Allow users to pass the file paths to the binary blocked, csv and ijv datasets to mllearn classes. - This allows the advance users who already have data materialized in binary blocked formats to avoid conversion overhead. - Also, this facility is useful for benchmarking the performance of Keras2DML and Caffe2DML. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8ffa3d15 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8ffa3d15 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8ffa3d15 Branch: refs/heads/master Commit: 8ffa3d158fe97e0871bcd4b77fa27504b8b85502 Parents: 54a11ee Author: Niketan Pansare <[email protected]> Authored: Tue Feb 27 13:21:32 2018 -0800 Committer: Niketan Pansare <[email protected]> Committed: Tue Feb 27 13:21:32 2018 -0800 ---------------------------------------------------------------------- src/main/python/systemml/mllearn/estimators.py | 28 +++++++++-- .../org/apache/sysml/api/dl/Caffe2DML.scala | 22 +++++++++ .../sysml/api/ml/BaseSystemMLClassifier.scala | 49 ++++++++++++++++++++ .../sysml/api/ml/BaseSystemMLRegressor.scala | 23 +++++++++ .../apache/sysml/api/ml/LinearRegression.scala | 9 +++- .../sysml/api/ml/LogisticRegression.scala | 7 +++ .../org/apache/sysml/api/ml/NaiveBayes.scala | 7 +++ .../scala/org/apache/sysml/api/ml/SVM.scala | 7 +++ 8 files changed, 145 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/python/systemml/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index 3f11d3f..de8aeb9 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -186,7 +186,19 @@ class BaseSystemMLEstimator(Estimator): self.X = None self.y = None return self - + + def fit_file(self, X_file, y_file): + global default_jvm_stdout, default_jvm_stdout_parallel_flush + try: + if default_jvm_stdout: + with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush): + self.model = self.estimator.fit(X_file, y_file) + else: + self.model = self.estimator.fit(X_file, y_file) + except Py4JError: + traceback.print_exc() + return self + # Returns a model after calling fit(df) on Estimator object on JVM def _fit(self, X): """ @@ -207,12 +219,14 @@ class BaseSystemMLEstimator(Estimator): Parameters ---------- - X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix - y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix, Spark DataFrame, file path + y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix, file path """ if y is None: return self._fit(X) - elif y is not None and isinstance(X, SUPPORTED_TYPES) and isinstance(y, SUPPORTED_TYPES): + elif isinstance(X, str) and isinstance(y, str): + return self.fit_file(X, y) + elif isinstance(X, SUPPORTED_TYPES) and isinstance(y, SUPPORTED_TYPES): # Donot encode if y is a numpy matrix => useful for segmentation skipEncodingY = len(y.shape) == 2 and y.shape[0] != 1 and y.shape[1] != 1 y = y if skipEncodingY else self.encode(y) @@ -307,6 +321,8 @@ class BaseSystemMLEstimator(Estimator): except AttributeError: pass try: + if isinstance(X, str): + return self.model.transform_probability(X) jX = self._convertPythonXToJavaObject(X) if default_jvm_stdout: with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush): @@ -323,7 +339,7 @@ class BaseSystemMLEstimator(Estimator): Parameters ---------- - X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame + X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame or file path """ global default_jvm_stdout, default_jvm_stdout_parallel_flush try: @@ -332,6 +348,8 @@ class BaseSystemMLEstimator(Estimator): except AttributeError: pass try: + if isinstance(X, str): + return self.model.transform(X) jX = self._convertPythonXToJavaObject(X) if default_jvm_stdout: with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush): http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala index da72403..26e554f 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala @@ -206,6 +206,10 @@ class Caffe2DML(val sc: SparkContext, val that = new Caffe2DML(sc, solverParam, solver, net, lrPolicy, numChannels, height, width) copyValues(that, extra) } + def fit(X_file: String, y_file: String): Caffe2DMLModel = { + mloutput = baseFit(X_file, y_file, sc) + new Caffe2DMLModel(this) + } // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): Caffe2DMLModel = { mloutput = baseFit(X_mb, y_mb, sc) @@ -822,6 +826,15 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C def baseEstimator(): BaseSystemMLEstimator = estimator // Prediction + def transform(X_file: String): String = + if (estimator.isClassification) { + Caffe2DML.LOG.debug("Prediction assuming classification") + baseTransform(X_file, sc, "Prob") + } else { + Caffe2DML.LOG.debug("Prediction assuming segmentation") + val outShape = estimator.getOutputShapeOfLastLayer + baseTransform(X_file, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt) + } def transform(X: MatrixBlock): MatrixBlock = if (estimator.isClassification) { Caffe2DML.LOG.debug("Prediction assuming classification") @@ -831,6 +844,15 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C val outShape = estimator.getOutputShapeOfLastLayer baseTransform(X, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt) } + def transform_probability(X_file: String): String = + if (estimator.isClassification) { + Caffe2DML.LOG.debug("Prediction of probability assuming classification") + baseTransformProbability(X_file, sc, "Prob") + } else { + Caffe2DML.LOG.debug("Prediction of probability assuming segmentation") + val outShape = estimator.getOutputShapeOfLastLayer + baseTransformProbability(X_file, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt) + } def transform_probability(X: MatrixBlock): MatrixBlock = if (estimator.isClassification) { Caffe2DML.LOG.debug("Prediction of probability assuming classification") http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala index 97abe9e..5d22c46 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala @@ -111,6 +111,11 @@ trait HasRegParam extends Params { } trait BaseSystemMLEstimatorOrModel { + def dmlRead(X:String, fileX:String):String = { + val format = if(fileX.endsWith(".csv")) ", format=\"csv\"" else "" + return X + " = read(\"" + fileX + "\"" + format + "); " + } + def dmlWrite(X:String):String = "write("+ X + ", \"output.mtx\", format=\"binary\"); " var enableGPU: Boolean = false var forceGPU: Boolean = false var explain: Boolean = false @@ -215,6 +220,16 @@ trait BaseSystemMLEstimatorModel extends BaseSystemMLEstimatorOrModel { } trait BaseSystemMLClassifier extends BaseSystemMLEstimator { + def baseFit(X_file: String, y_file: String, sc: SparkContext): MLResults = { + val isSingleNode = false + val ml = new MLContext(sc) + updateML(ml) + val readScript = dml(dmlRead("X", X_file) + dmlRead("y", y_file)).out("X", "y") + val res = ml.execute(readScript) + val ret = getTrainingScript(isSingleNode) + val script = ret._1.in(ret._2, res.getMatrix("X")).in(ret._3, res.getMatrix("y")) + ml.execute(script) + } def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { val isSingleNode = true val ml = new MLContext(sc) @@ -242,7 +257,32 @@ trait BaseSystemMLClassifier extends BaseSystemMLEstimator { trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { + def baseTransform(X_file: String, sc: SparkContext, probVar: String): String = baseTransform(X_file, sc, probVar, -1, 1, 1) def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String): MatrixBlock = baseTransform(X, sc, probVar, -1, 1, 1) + + def baseTransform(X: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): String = { + val Prob = baseTransformHelper(X, sc, probVar, C, H, W) + val script1 = dml("source(\"nn/util.dml\") as util; Prediction = util::predict_class(Prob, C, H, W); " + dmlWrite("Prediction")) + .in("Prob", Prob) + .in("C", C) + .in("H", H) + .in("W", W) + val ml = new MLContext(sc) + updateML(ml) + ml.execute(script1) + return "output.mtx" + } + + def baseTransformHelper(X_file: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): Matrix = { + val isSingleNode = true + val ml = new MLContext(sc) + updateML(ml) + val readScript = dml(dmlRead("X", X_file)).out("X") + val res = ml.execute(readScript) + val script = getPredictionScript(isSingleNode) + val modelPredict = ml.execute(script._1.in(script._2, res.getMatrix("X"))) + return modelPredict.getMatrix(probVar) + } def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): MatrixBlock = { val Prob = baseTransformHelper(X, sc, probVar, C, H, W) @@ -282,9 +322,18 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: String): MatrixBlock = baseTransformProbability(X, sc, probVar, -1, 1, 1) + def baseTransformProbability(X: String, sc: SparkContext, probVar: String): String = + baseTransformProbability(X, sc, probVar, -1, 1, 1) + def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): MatrixBlock = return baseTransformHelper(X, sc, probVar, C, H, W).toMatrixBlock + def baseTransformProbability(X: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): String = { + val Prob = baseTransformHelper(X, sc, probVar, C, H, W) + (new MLContext(sc)).execute(dml(dmlWrite("Prob")).in("Prob", Prob)) + "output.mtx" + } + def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar: String, outputProb: Boolean = true): DataFrame = baseTransform(df, sc, probVar, outputProb, -1, 1, 1) http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala index d94655b..4731422 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala @@ -35,6 +35,17 @@ import org.apache.sysml.api.mlcontext.ScriptFactory._ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { + def baseFit(X_file: String, y_file: String, sc: SparkContext): MLResults = { + val isSingleNode = false + val ml = new MLContext(sc) + updateML(ml) + val readScript = dml(dmlRead("X", X_file) + dmlRead("y", y_file)).out("X", "y") + val res = ml.execute(readScript) + val ret = getTrainingScript(isSingleNode) + val script = ret._1.in(ret._2, res.getMatrix("X")).in(ret._3, res.getMatrix("y")) + ml.execute(script) + } + def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { val isSingleNode = true val ml = new MLContext(sc) @@ -61,6 +72,18 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { + def baseTransform(X_file: String, sc: SparkContext, predictionVar: String): String = { + val isSingleNode = false + val ml = new MLContext(sc) + updateML(ml) + val readScript = dml(dmlRead("X", X_file)).out("X") + val res = ml.execute(readScript) + val script = getPredictionScript(isSingleNode) + val modelPredict = ml.execute(script._1.in(script._2, res.getMatrix("X"))) + val writeScript = dml(dmlWrite("X")).in("X", modelPredict.getMatrix(predictionVar)) + ml.execute(writeScript) + return "output.mtx" + } def baseTransform(X: MatrixBlock, sc: SparkContext, predictionVar: String): MatrixBlock = { val isSingleNode = true val ml = new MLContext(sc) http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala index b6f4966..ffb5d72 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala @@ -76,6 +76,11 @@ class LinearRegression(override val uid: String, val sc: SparkContext, val solve .out("beta_out") (script, "X", "y") } + + def fit(X_file: String, y_file: String): LinearRegressionModel = { + mloutput = baseFit(X_file, y_file, sc) + new LinearRegressionModel(this) + } def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = { mloutput = baseFit(X_mb, y_mb, sc) @@ -102,6 +107,7 @@ class LinearRegressionModel(override val uid: String)(estimator: LinearRegressio } def transform_probability(X: MatrixBlock): MatrixBlock = throw new DMLRuntimeException("Unsupported method") + def transform_probability(X_file: String): String = throw new DMLRuntimeException("Unsupported method") def baseEstimator(): BaseSystemMLEstimator = estimator @@ -115,7 +121,6 @@ class LinearRegressionModel(override val uid: String)(estimator: LinearRegressio def modelVariables(): List[String] = List[String]("beta_out") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means") - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") - + def transform(X_file: String): String = baseTransform(X_file, sc, "means") } http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala index 98b6dd4..23ebcce 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala @@ -59,6 +59,11 @@ class LogisticRegression(override val uid: String, val sc: SparkContext) val that = new LogisticRegression(uid, sc) copyValues(that, extra) } + + def fit(X_file: String, y_file: String): LogisticRegressionModel = { + mloutput = baseFit(X_file, y_file, sc) + new LogisticRegressionModel(this) + } // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LogisticRegressionModel = { @@ -116,7 +121,9 @@ class LogisticRegressionModel(override val uid: String)(estimator: LogisticRegre def modelVariables(): List[String] = List[String]("B_out") def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") + def transform(X: String): String = baseTransform(X, sc, "means") def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "means") + def transform_probability(X: String): String = baseTransformProbability(X, sc, "means") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means") } http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala index 8ecd4f0..43d1c53 100644 --- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala +++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala @@ -44,6 +44,11 @@ class NaiveBayes(override val uid: String, val sc: SparkContext) extends Estimat } def setLaplace(value: Double) = set(laplace, value) + def fit(X_file: String, y_file: String): NaiveBayesModel = { + mloutput = baseFit(X_file, y_file, sc) + new NaiveBayesModel(this) + } + // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = { mloutput = baseFit(X_mb, y_mb, sc) @@ -108,7 +113,9 @@ class NaiveBayesModel(override val uid: String)(estimator: NaiveBayes, val sc: S def baseEstimator(): BaseSystemMLEstimator = estimator def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "probs") + def transform(X: String): String = baseTransform(X, sc, "probs") def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "probs") + def transform_probability(X: String): String = baseTransformProbability(X, sc, "probs") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "probs") } http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/SVM.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala index c4b7ae4..98ff81a 100644 --- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala +++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala @@ -69,6 +69,11 @@ class SVM(override val uid: String, val sc: SparkContext, val isMultiClass: Bool .out("w") (script, "X", "Y") } + + def fit(X_file: String, y_file: String): SVMModel = { + mloutput = baseFit(X_file, y_file, sc) + new SVMModel(this, isMultiClass) + } // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = { @@ -121,5 +126,7 @@ class SVMModel(override val uid: String)(estimator: SVM, val sc: SparkContext, v def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "scores") def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "scores") + def transform(X: String): String = baseTransform(X, sc, "scores") + def transform_probability(X: String): String = baseTransformProbability(X, sc, "scores") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "scores") }
