Repository: spark
Updated Branches:
  refs/heads/master 129f2f455 -> 90b46e014


[SPARK-7861][ML] PySpark OneVsRest

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-7861

Add PySpark OneVsRest. I implement it with Python since it's a meta-pipeline.

## How was this patch tested?

Test with doctest.

Author: Xusen Yin <[email protected]>

Closes #12124 from yinxusen/SPARK-14306-7861.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90b46e01
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90b46e01
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90b46e01

Branch: refs/heads/master
Commit: 90b46e014a60069bd18754b02fce056d8f4d1b3e
Parents: 129f2f4
Author: Xusen Yin <[email protected]>
Authored: Fri Apr 15 12:58:38 2016 -0700
Committer: Joseph K. Bradley <[email protected]>
Committed: Fri Apr 15 12:58:38 2016 -0700

----------------------------------------------------------------------
 python/pyspark/ml/classification.py | 224 ++++++++++++++++++++++++++++++-
 python/pyspark/ml/tests.py          |  32 ++++-
 2 files changed, 249 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90b46e01/python/pyspark/ml/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index 7051798..0893167 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -15,18 +15,21 @@
 # limitations under the License.
 #
 
+import operator
 import warnings
 
-from pyspark import since
-from pyspark.ml.util import *
-from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
-from pyspark.ml.param import TypeConverters
+from pyspark.ml import Estimator, Model
 from pyspark.ml.param.shared import *
 from pyspark.ml.regression import (
     RandomForestParams, TreeEnsembleParams, DecisionTreeModel, 
TreeEnsembleModels)
+from pyspark.ml.util import *
+from pyspark.ml.wrapper import JavaEstimator, JavaModel
+from pyspark.ml.wrapper import JavaWrapper
 from pyspark.mllib.common import inherit_doc
 from pyspark.sql import DataFrame
-
+from pyspark.sql.functions import udf, when
+from pyspark.sql.types import ArrayType, DoubleType
+from pyspark.storagelevel import StorageLevel
 
 __all__ = ['LogisticRegression', 'LogisticRegressionModel',
            'LogisticRegressionSummary', 'LogisticRegressionTrainingSummary',
@@ -35,7 +38,8 @@ __all__ = ['LogisticRegression', 'LogisticRegressionModel',
            'GBTClassifier', 'GBTClassificationModel',
            'RandomForestClassifier', 'RandomForestClassificationModel',
            'NaiveBayes', 'NaiveBayesModel',
-           'MultilayerPerceptronClassifier', 
'MultilayerPerceptronClassificationModel']
+           'MultilayerPerceptronClassifier', 
'MultilayerPerceptronClassificationModel',
+           'OneVsRest', 'OneVsRestModel']
 
 
 @inherit_doc
@@ -1156,6 +1160,214 @@ class 
MultilayerPerceptronClassificationModel(JavaModel, JavaMLWritable, JavaMLR
         return self._call_java("weights")
 
 
+@inherit_doc
+class OneVsRest(Estimator, HasFeaturesCol, HasLabelCol, HasPredictionCol):
+    """
+    Reduction of Multiclass Classification to Binary Classification.
+    Performs reduction using one against all strategy.
+    For a multiclass classification with k classes, train k models (one per 
class).
+    Each example is scored against all k models and the model with highest 
score
+    is picked to label the example.
+
+    >>> from pyspark.sql import Row
+    >>> from pyspark.mllib.linalg import Vectors
+    >>> df = sc.parallelize([
+    ...     Row(label=0.0, features=Vectors.dense(1.0, 0.8)),
+    ...     Row(label=1.0, features=Vectors.sparse(2, [], [])),
+    ...     Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF()
+    >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
+    >>> ovr = OneVsRest(classifier=lr)
+    >>> model = ovr.fit(df)
+    >>> [x.coefficients for x in model.models]
+    [DenseVector([3.3925, 1.8785]), DenseVector([-4.3016, -6.3163]), 
DenseVector([-4.5855, 6.1785])]
+    >>> [x.intercept for x in model.models]
+    [-3.6474708290602034, 2.5507881951814495, -1.1016513228162115]
+    >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF()
+    >>> model.transform(test0).head().prediction
+    1.0
+    >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], 
[1.0]))]).toDF()
+    >>> model.transform(test1).head().prediction
+    0.0
+    >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF()
+    >>> model.transform(test2).head().prediction
+    2.0
+
+    .. versionadded:: 2.0.0
+    """
+
+    classifier = Param(Params._dummy(), "classifier", "base binary classifier")
+
+    @keyword_only
+    def __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction",
+                 classifier=None):
+        """
+        __init__(self, featuresCol="features", labelCol="label", 
predictionCol="prediction", \
+                 classifier=None)
+        """
+        super(OneVsRest, self).__init__()
+        kwargs = self.__init__._input_kwargs
+        self._set(**kwargs)
+
+    @keyword_only
+    @since("2.0.0")
+    def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, 
classifier=None):
+        """
+        setParams(self, featuresCol=None, labelCol=None, predictionCol=None, 
classifier=None):
+        Sets params for OneVsRest.
+        """
+        kwargs = self.setParams._input_kwargs
+        return self._set(**kwargs)
+
+    @since("2.0.0")
+    def setClassifier(self, value):
+        """
+        Sets the value of :py:attr:`classifier`.
+
+        .. note:: Only LogisticRegression and NaiveBayes are supported now.
+        """
+        self._set(classifier=value)
+        return self
+
+    @since("2.0.0")
+    def getClassifier(self):
+        """
+        Gets the value of classifier or its default value.
+        """
+        return self.getOrDefault(self.classifier)
+
+    def _fit(self, dataset):
+        labelCol = self.getLabelCol()
+        featuresCol = self.getFeaturesCol()
+        predictionCol = self.getPredictionCol()
+        classifier = self.getClassifier()
+        assert isinstance(classifier, HasRawPredictionCol),\
+            "Classifier %s doesn't extend from HasRawPredictionCol." % 
type(classifier)
+
+        numClasses = int(dataset.agg({labelCol: 
"max"}).head()["max("+labelCol+")"]) + 1
+
+        multiclassLabeled = dataset.select(labelCol, featuresCol)
+
+        # persist if underlying dataset is not persistent.
+        handlePersistence = \
+            dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, 
False)
+        if handlePersistence:
+            multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
+
+        def trainSingleClass(index):
+            binaryLabelCol = "mc2b$" + str(index)
+            trainingDataset = multiclassLabeled.withColumn(
+                binaryLabelCol,
+                when(multiclassLabeled[labelCol] == float(index), 
1.0).otherwise(0.0))
+            paramMap = dict([(classifier.labelCol, binaryLabelCol),
+                            (classifier.featuresCol, featuresCol),
+                            (classifier.predictionCol, predictionCol)])
+            return classifier.fit(trainingDataset, paramMap)
+
+        # TODO: Parallel training for all classes.
+        models = [trainSingleClass(i) for i in range(numClasses)]
+
+        if handlePersistence:
+            multiclassLabeled.unpersist()
+
+        return self._copyValues(OneVsRestModel(models=models))
+
+    @since("2.0.0")
+    def copy(self, extra=None):
+        """
+        Creates a copy of this instance with a randomly generated uid
+        and some extra params. This creates a deep copy of the embedded 
paramMap,
+        and copies the embedded and extra parameters over.
+
+        :param extra: Extra parameters to copy to the new instance
+        :return: Copy of this instance
+        """
+        if extra is None:
+            extra = dict()
+        newOvr = Params.copy(self, extra)
+        if self.isSet(self.classifier):
+            newOvr.setClassifier(self.getClassifier().copy(extra))
+        return newOvr
+
+
+class OneVsRestModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol):
+    """
+    Model fitted by OneVsRest.
+    This stores the models resulting from training k binary classifiers: one 
for each class.
+    Each example is scored against all k models, and the model with the 
highest score
+    is picked to label the example.
+
+    .. versionadded:: 2.0.0
+    """
+
+    def __init__(self, models):
+        super(OneVsRestModel, self).__init__()
+        self.models = models
+
+    def _transform(self, dataset):
+        # determine the input columns: these need to be passed through
+        origCols = dataset.columns
+
+        # add an accumulator column to store predictions of all the models
+        accColName = "mbc$acc" + str(uuid.uuid4())
+        initUDF = udf(lambda _: [], ArrayType(DoubleType()))
+        newDataset = dataset.withColumn(accColName, 
initUDF(dataset[origCols[0]]))
+
+        # persist if underlying dataset is not persistent.
+        handlePersistence = \
+            dataset.rdd.getStorageLevel() == StorageLevel(False, False, False, 
False)
+        if handlePersistence:
+            newDataset.persist(StorageLevel.MEMORY_AND_DISK)
+
+        # update the accumulator column with the result of prediction of models
+        aggregatedDataset = newDataset
+        for index, model in enumerate(self.models):
+            rawPredictionCol = model._call_java("getRawPredictionCol")
+            columns = origCols + [rawPredictionCol, accColName]
+
+            # add temporary column to store intermediate scores and update
+            tmpColName = "mbc$tmp" + str(uuid.uuid4())
+            updateUDF = udf(
+                lambda predictions, prediction: predictions + 
[prediction.tolist()[1]],
+                ArrayType(DoubleType()))
+            transformedDataset = 
model.transform(aggregatedDataset).select(*columns)
+            updatedDataset = transformedDataset.withColumn(
+                tmpColName,
+                updateUDF(transformedDataset[accColName], 
transformedDataset[rawPredictionCol]))
+            newColumns = origCols + [tmpColName]
+
+            # switch out the intermediate column with the accumulator column
+            aggregatedDataset = updatedDataset\
+                .select(*newColumns).withColumnRenamed(tmpColName, accColName)
+
+        if handlePersistence:
+            newDataset.unpersist()
+
+        # output the index of the classifier with highest confidence as 
prediction
+        labelUDF = udf(
+            lambda predictions: float(max(enumerate(predictions), 
key=operator.itemgetter(1))[0]),
+            DoubleType())
+
+        # output label and label metadata as prediction
+        return aggregatedDataset.withColumn(
+            self.getPredictionCol(), 
labelUDF(aggregatedDataset[accColName])).drop(accColName)
+
+    @since("2.0.0")
+    def copy(self, extra=None):
+        """
+        Creates a copy of this instance with a randomly generated uid
+        and some extra params. This creates a deep copy of the embedded 
paramMap,
+        and copies the embedded and extra parameters over.
+
+        :param extra: Extra parameters to copy to the new instance
+        :return: Copy of this instance
+        """
+        if extra is None:
+            extra = dict()
+        newModel = Params.copy(self, extra)
+        newModel.models = [model.copy(extra) for model in self.models]
+        return newModel
+
+
 if __name__ == "__main__":
     import doctest
     import pyspark.ml.classification

http://git-wip-us.apache.org/repos/asf/spark/blob/90b46e01/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 85ad949..d595eff 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -43,7 +43,7 @@ import tempfile
 import numpy as np
 
 from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer
-from pyspark.ml.classification import LogisticRegression, 
DecisionTreeClassifier
+from pyspark.ml.classification import LogisticRegression, 
DecisionTreeClassifier, OneVsRest
 from pyspark.ml.clustering import KMeans
 from pyspark.ml.evaluation import BinaryClassificationEvaluator, 
RegressionEvaluator
 from pyspark.ml.feature import *
@@ -850,6 +850,36 @@ class TrainingSummaryTest(PySparkTestCase):
         self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)
 
 
+class OneVsRestTests(PySparkTestCase):
+
+    def test_copy(self):
+        sqlContext = SQLContext(self.sc)
+        df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+                                         (1.0, Vectors.sparse(2, [], [])),
+                                         (2.0, Vectors.dense(0.5, 0.5))],
+                                        ["label", "features"])
+        lr = LogisticRegression(maxIter=5, regParam=0.01)
+        ovr = OneVsRest(classifier=lr)
+        ovr1 = ovr.copy({lr.maxIter: 10})
+        self.assertEqual(ovr.getClassifier().getMaxIter(), 5)
+        self.assertEqual(ovr1.getClassifier().getMaxIter(), 10)
+        model = ovr.fit(df)
+        model1 = model.copy({model.predictionCol: "indexed"})
+        self.assertEqual(model1.getPredictionCol(), "indexed")
+
+    def test_output_columns(self):
+        sqlContext = SQLContext(self.sc)
+        df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+                                         (1.0, Vectors.sparse(2, [], [])),
+                                         (2.0, Vectors.dense(0.5, 0.5))],
+                                        ["label", "features"])
+        lr = LogisticRegression(maxIter=5, regParam=0.01)
+        ovr = OneVsRest(classifier=lr)
+        model = ovr.fit(df)
+        output = model.transform(df)
+        self.assertEqual(output.columns, ["label", "features", "prediction"])
+
+
 class HashingTFTest(PySparkTestCase):
 
     def test_apply_binary_term_freqs(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to