Further, I think we should return just
broadcastVariable = getRuntimeContext.
getBroadcastVariable[B]("broadcastVariable")
in BroadcastSingleElementMapper
User may wish to have a list broadcasted, and not just want to access the
first element. For example, this would make sense in the kmeans algorithm.

Regards
Sachin Goel

On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <sachingoel0...@gmail.com>
wrote:

> Hi Till
> This works only when there is only one variable to be broadcasted, doesn't
> it? What about the case when we need to broadcast two? Is it advisable to
> create a BroadcastDoubleElementMapper class or perhaps we could just send a
> tuple of all the variables? Perhaps that is a better idea.
>
> Regards
> Sachin Goel
>
> On Tue, Jun 2, 2015 at 8:15 PM, <trohrm...@apache.org> wrote:
>
>> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/flink/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5
>> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5
>> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5
>>
>> Branch: refs/heads/master
>> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c
>> Parents: 44dae0c
>> Author: Till Rohrmann <trohrm...@apache.org>
>> Authored: Tue Jun 2 14:45:12 2015 +0200
>> Committer: Till Rohrmann <trohrm...@apache.org>
>> Committed: Tue Jun 2 15:34:54 2015 +0200
>>
>> ----------------------------------------------------------------------
>>  .../apache/flink/ml/classification/SVM.scala    | 73 ++++++--------------
>>  .../flink/ml/preprocessing/StandardScaler.scala | 39 +++--------
>>  2 files changed, 30 insertions(+), 82 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> ----------------------------------------------------------------------
>> diff --git
>> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> index e01735f..c69b56a 100644
>> ---
>> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> +++
>> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
>> @@ -26,6 +26,7 @@ import scala.util.Random
>>  import org.apache.flink.api.common.functions.RichMapFunction
>>  import org.apache.flink.api.scala._
>>  import org.apache.flink.configuration.Configuration
>> +import org.apache.flink.ml._
>>  import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
>>  import org.apache.flink.ml.common._
>>  import org.apache.flink.ml.math.Vector
>> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] {
>>    * of the algorithm.
>>    */
>>  object SVM{
>> +
>>    val WEIGHT_VECTOR ="weightVector"
>>
>>    // ========================================== Parameters
>> =========================================
>> @@ -242,7 +244,13 @@ object SVM{
>>
>>          instance.weightsOption match {
>>            case Some(weights) => {
>> -            input.map(new PredictionMapper[T]).withBroadcastSet(weights,
>> WEIGHT_VECTOR)
>> +            input.mapWithBcVariable(weights){
>> +              (vector, weights) => {
>> +                val dotProduct = weights dot vector.asBreeze
>> +
>> +                LabeledVector(dotProduct, vector)
>> +              }
>> +            }
>>            }
>>
>>            case None => {
>> @@ -254,28 +262,6 @@ object SVM{
>>      }
>>    }
>>
>> -  /** Mapper to calculate the value of the prediction function. This is
>> a RichMapFunction, because
>> -    * we broadcast the weight vector to all mappers.
>> -    */
>> -  class PredictionMapper[T <: Vector] extends RichMapFunction[T,
>> LabeledVector] {
>> -
>> -    var weights: BreezeDenseVector[Double] = _
>> -
>> -    @throws(classOf[Exception])
>> -    override def open(configuration: Configuration): Unit = {
>> -      // get current weights
>> -      weights = getRuntimeContext.
>> -
>> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
>> -    }
>> -
>> -    override def map(vector: T): LabeledVector = {
>> -      // calculate the prediction value (scaled distance from the
>> separating hyperplane)
>> -      val dotProduct = weights dot vector.asBreeze
>> -
>> -      LabeledVector(dotProduct, vector)
>> -    }
>> -  }
>> -
>>    /** [[org.apache.flink.ml.pipeline.PredictOperation]] for
>> [[LabeledVector ]]types. The result type
>>      * is a [[(Double, Double)]] tuple, corresponding to (truth,
>> prediction)
>>      *
>> @@ -291,7 +277,14 @@ object SVM{
>>
>>          instance.weightsOption match {
>>            case Some(weights) => {
>> -            input.map(new
>> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
>> +            input.mapWithBcVariable(weights){
>> +              (labeledVector, weights) => {
>> +                val prediction = weights dot
>> labeledVector.vector.asBreeze
>> +                val truth = labeledVector.label
>> +
>> +                (truth, prediction)
>> +              }
>> +            }
>>            }
>>
>>            case None => {
>> @@ -303,30 +296,6 @@ object SVM{
>>      }
>>    }
>>
>> -  /** Mapper to calculate the value of the prediction function. This is
>> a RichMapFunction, because
>> -    * we broadcast the weight vector to all mappers.
>> -    */
>> -  class LabeledPredictionMapper extends RichMapFunction[LabeledVector,
>> (Double, Double)] {
>> -
>> -    var weights: BreezeDenseVector[Double] = _
>> -
>> -    @throws(classOf[Exception])
>> -    override def open(configuration: Configuration): Unit = {
>> -      // get current weights
>> -      weights = getRuntimeContext.
>> -
>> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
>> -    }
>> -
>> -    override def map(labeledVector: LabeledVector): (Double, Double) = {
>> -      // calculate the prediction value (scaled distance from the
>> separating hyperplane)
>> -      val prediction = weights dot labeledVector.vector.asBreeze
>> -      val truth = labeledVector.label
>> -
>> -      (truth, prediction)
>> -    }
>> -  }
>> -
>> -
>>    /** [[FitOperation]] which trains a SVM with soft-margin based on the
>> given training data set.
>>      *
>>      */
>> @@ -540,17 +509,17 @@ object SVM{
>>
>>      // compute projected gradient
>>      var proj_grad = if(alpha  <= 0.0){
>> -      math.min(grad, 0)
>> +      scala.math.min(grad, 0)
>>      } else if(alpha >= 1.0) {
>> -      math.max(grad, 0)
>> +      scala.math.max(grad, 0)
>>      } else {
>>        grad
>>      }
>>
>> -    if(math.abs(grad) != 0.0){
>> +    if(scala.math.abs(grad) != 0.0){
>>        val qii = x dot x
>>        val newAlpha = if(qii != 0.0){
>> -        math.min(math.max((alpha - (grad / qii)), 0.0), 1.0)
>> +        scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0)
>>        } else {
>>          1.0
>>        }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> ----------------------------------------------------------------------
>> diff --git
>> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> index 2e3ed95..7992b02 100644
>> ---
>> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> +++
>> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
>> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._
>>  import org.apache.flink.api.common.typeinfo.TypeInformation
>>  import org.apache.flink.api.scala._
>>  import org.apache.flink.configuration.Configuration
>> +import org.apache.flink.ml._
>>  import org.apache.flink.ml.common.{LabeledVector, Parameter,
>> ParameterMap}
>>  import org.apache.flink.ml.math.Breeze._
>>  import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
>> @@ -209,20 +210,9 @@ object StandardScaler {
>>
>>          instance.metricsOption match {
>>            case Some(metrics) => {
>> -            input.map(new RichMapFunction[T, T]() {
>> -
>> -              var broadcastMean: linalg.Vector[Double] = null
>> -              var broadcastStd: linalg.Vector[Double] = null
>> -
>> -              override def open(parameters: Configuration): Unit = {
>> -                val broadcastedMetrics =
>> getRuntimeContext().getBroadcastVariable[
>> -                    (linalg.Vector[Double], linalg.Vector[Double])
>> -                  ]("broadcastedMetrics").get(0)
>> -                broadcastMean = broadcastedMetrics._1
>> -                broadcastStd = broadcastedMetrics._2
>> -              }
>> -
>> -              override def map(vector: T): T = {
>> +            input.mapWithBcVariable(metrics){
>> +              (vector, metrics) => {
>> +                val (broadcastMean, broadcastStd) = metrics
>>                  var myVector = vector.asBreeze
>>
>>                  myVector -= broadcastMean
>> @@ -230,7 +220,7 @@ object StandardScaler {
>>                  myVector = (myVector :* std) + mean
>>                  myVector.fromBreeze
>>                }
>> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
>> +            }
>>            }
>>
>>            case None =>
>> @@ -251,20 +241,9 @@ object StandardScaler {
>>
>>          instance.metricsOption match {
>>            case Some(metrics) => {
>> -            input.map(new RichMapFunction[LabeledVector,
>> LabeledVector]() {
>> -
>> -              var broadcastMean: linalg.Vector[Double] = null
>> -              var broadcastStd: linalg.Vector[Double] = null
>> -
>> -              override def open(parameters: Configuration): Unit = {
>> -                val broadcastedMetrics =
>> getRuntimeContext().getBroadcastVariable[
>> -                  (linalg.Vector[Double], linalg.Vector[Double])
>> -                  ]("broadcastedMetrics").get(0)
>> -                broadcastMean = broadcastedMetrics._1
>> -                broadcastStd = broadcastedMetrics._2
>> -              }
>> -
>> -              override def map(labeledVector: LabeledVector):
>> LabeledVector = {
>> +            input.mapWithBcVariable(metrics){
>> +              (labeledVector, metrics) => {
>> +                val (broadcastMean, broadcastStd) = metrics
>>                  val LabeledVector(label, vector) = labeledVector
>>                  var breezeVector = vector.asBreeze
>>
>> @@ -273,7 +252,7 @@ object StandardScaler {
>>                  breezeVector = (breezeVector :* std) + mean
>>                  LabeledVector(label, breezeVector.fromBreeze[Vector])
>>                }
>> -            }).withBroadcastSet(metrics, "broadcastedMetrics")
>> +            }
>>            }
>>
>>            case None =>
>>
>>
>

Reply via email to