Further, I think we should return just broadcastVariable = getRuntimeContext. getBroadcastVariable[B]("broadcastVariable") in BroadcastSingleElementMapper User may wish to have a list broadcasted, and not just want to access the first element. For example, this would make sense in the kmeans algorithm.
Regards Sachin Goel On Tue, Jun 2, 2015 at 9:03 PM, Sachin Goel <sachingoel0...@gmail.com> wrote: > Hi Till > This works only when there is only one variable to be broadcasted, doesn't > it? What about the case when we need to broadcast two? Is it advisable to > create a BroadcastDoubleElementMapper class or perhaps we could just send a > tuple of all the variables? Perhaps that is a better idea. > > Regards > Sachin Goel > > On Tue, Jun 2, 2015 at 8:15 PM, <trohrm...@apache.org> wrote: > >> [ml] Replaces RichMapFunctions with mapWithBcVariable in FlinkML >> >> >> Project: http://git-wip-us.apache.org/repos/asf/flink/repo >> Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/950b79c5 >> Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/950b79c5 >> Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/950b79c5 >> >> Branch: refs/heads/master >> Commit: 950b79c59327e96e3b1504616d26460cbff7fd4c >> Parents: 44dae0c >> Author: Till Rohrmann <trohrm...@apache.org> >> Authored: Tue Jun 2 14:45:12 2015 +0200 >> Committer: Till Rohrmann <trohrm...@apache.org> >> Committed: Tue Jun 2 15:34:54 2015 +0200 >> >> ---------------------------------------------------------------------- >> .../apache/flink/ml/classification/SVM.scala | 73 ++++++-------------- >> .../flink/ml/preprocessing/StandardScaler.scala | 39 +++-------- >> 2 files changed, 30 insertions(+), 82 deletions(-) >> ---------------------------------------------------------------------- >> >> >> >> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> ---------------------------------------------------------------------- >> diff --git >> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> index e01735f..c69b56a 100644 >> --- >> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> +++ >> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala >> @@ -26,6 +26,7 @@ import scala.util.Random >> import org.apache.flink.api.common.functions.RichMapFunction >> import org.apache.flink.api.scala._ >> import org.apache.flink.configuration.Configuration >> +import org.apache.flink.ml._ >> import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner >> import org.apache.flink.ml.common._ >> import org.apache.flink.ml.math.Vector >> @@ -190,6 +191,7 @@ class SVM extends Predictor[SVM] { >> * of the algorithm. >> */ >> object SVM{ >> + >> val WEIGHT_VECTOR ="weightVector" >> >> // ========================================== Parameters >> ========================================= >> @@ -242,7 +244,13 @@ object SVM{ >> >> instance.weightsOption match { >> case Some(weights) => { >> - input.map(new PredictionMapper[T]).withBroadcastSet(weights, >> WEIGHT_VECTOR) >> + input.mapWithBcVariable(weights){ >> + (vector, weights) => { >> + val dotProduct = weights dot vector.asBreeze >> + >> + LabeledVector(dotProduct, vector) >> + } >> + } >> } >> >> case None => { >> @@ -254,28 +262,6 @@ object SVM{ >> } >> } >> >> - /** Mapper to calculate the value of the prediction function. This is >> a RichMapFunction, because >> - * we broadcast the weight vector to all mappers. >> - */ >> - class PredictionMapper[T <: Vector] extends RichMapFunction[T, >> LabeledVector] { >> - >> - var weights: BreezeDenseVector[Double] = _ >> - >> - @throws(classOf[Exception]) >> - override def open(configuration: Configuration): Unit = { >> - // get current weights >> - weights = getRuntimeContext. >> - >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) >> - } >> - >> - override def map(vector: T): LabeledVector = { >> - // calculate the prediction value (scaled distance from the >> separating hyperplane) >> - val dotProduct = weights dot vector.asBreeze >> - >> - LabeledVector(dotProduct, vector) >> - } >> - } >> - >> /** [[org.apache.flink.ml.pipeline.PredictOperation]] for >> [[LabeledVector ]]types. The result type >> * is a [[(Double, Double)]] tuple, corresponding to (truth, >> prediction) >> * >> @@ -291,7 +277,14 @@ object SVM{ >> >> instance.weightsOption match { >> case Some(weights) => { >> - input.map(new >> LabeledPredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR) >> + input.mapWithBcVariable(weights){ >> + (labeledVector, weights) => { >> + val prediction = weights dot >> labeledVector.vector.asBreeze >> + val truth = labeledVector.label >> + >> + (truth, prediction) >> + } >> + } >> } >> >> case None => { >> @@ -303,30 +296,6 @@ object SVM{ >> } >> } >> >> - /** Mapper to calculate the value of the prediction function. This is >> a RichMapFunction, because >> - * we broadcast the weight vector to all mappers. >> - */ >> - class LabeledPredictionMapper extends RichMapFunction[LabeledVector, >> (Double, Double)] { >> - >> - var weights: BreezeDenseVector[Double] = _ >> - >> - @throws(classOf[Exception]) >> - override def open(configuration: Configuration): Unit = { >> - // get current weights >> - weights = getRuntimeContext. >> - >> getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0) >> - } >> - >> - override def map(labeledVector: LabeledVector): (Double, Double) = { >> - // calculate the prediction value (scaled distance from the >> separating hyperplane) >> - val prediction = weights dot labeledVector.vector.asBreeze >> - val truth = labeledVector.label >> - >> - (truth, prediction) >> - } >> - } >> - >> - >> /** [[FitOperation]] which trains a SVM with soft-margin based on the >> given training data set. >> * >> */ >> @@ -540,17 +509,17 @@ object SVM{ >> >> // compute projected gradient >> var proj_grad = if(alpha <= 0.0){ >> - math.min(grad, 0) >> + scala.math.min(grad, 0) >> } else if(alpha >= 1.0) { >> - math.max(grad, 0) >> + scala.math.max(grad, 0) >> } else { >> grad >> } >> >> - if(math.abs(grad) != 0.0){ >> + if(scala.math.abs(grad) != 0.0){ >> val qii = x dot x >> val newAlpha = if(qii != 0.0){ >> - math.min(math.max((alpha - (grad / qii)), 0.0), 1.0) >> + scala.math.min(scala.math.max((alpha - (grad / qii)), 0.0), 1.0) >> } else { >> 1.0 >> } >> >> >> http://git-wip-us.apache.org/repos/asf/flink/blob/950b79c5/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> ---------------------------------------------------------------------- >> diff --git >> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> index 2e3ed95..7992b02 100644 >> --- >> a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> +++ >> b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala >> @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions._ >> import org.apache.flink.api.common.typeinfo.TypeInformation >> import org.apache.flink.api.scala._ >> import org.apache.flink.configuration.Configuration >> +import org.apache.flink.ml._ >> import org.apache.flink.ml.common.{LabeledVector, Parameter, >> ParameterMap} >> import org.apache.flink.ml.math.Breeze._ >> import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} >> @@ -209,20 +210,9 @@ object StandardScaler { >> >> instance.metricsOption match { >> case Some(metrics) => { >> - input.map(new RichMapFunction[T, T]() { >> - >> - var broadcastMean: linalg.Vector[Double] = null >> - var broadcastStd: linalg.Vector[Double] = null >> - >> - override def open(parameters: Configuration): Unit = { >> - val broadcastedMetrics = >> getRuntimeContext().getBroadcastVariable[ >> - (linalg.Vector[Double], linalg.Vector[Double]) >> - ]("broadcastedMetrics").get(0) >> - broadcastMean = broadcastedMetrics._1 >> - broadcastStd = broadcastedMetrics._2 >> - } >> - >> - override def map(vector: T): T = { >> + input.mapWithBcVariable(metrics){ >> + (vector, metrics) => { >> + val (broadcastMean, broadcastStd) = metrics >> var myVector = vector.asBreeze >> >> myVector -= broadcastMean >> @@ -230,7 +220,7 @@ object StandardScaler { >> myVector = (myVector :* std) + mean >> myVector.fromBreeze >> } >> - }).withBroadcastSet(metrics, "broadcastedMetrics") >> + } >> } >> >> case None => >> @@ -251,20 +241,9 @@ object StandardScaler { >> >> instance.metricsOption match { >> case Some(metrics) => { >> - input.map(new RichMapFunction[LabeledVector, >> LabeledVector]() { >> - >> - var broadcastMean: linalg.Vector[Double] = null >> - var broadcastStd: linalg.Vector[Double] = null >> - >> - override def open(parameters: Configuration): Unit = { >> - val broadcastedMetrics = >> getRuntimeContext().getBroadcastVariable[ >> - (linalg.Vector[Double], linalg.Vector[Double]) >> - ]("broadcastedMetrics").get(0) >> - broadcastMean = broadcastedMetrics._1 >> - broadcastStd = broadcastedMetrics._2 >> - } >> - >> - override def map(labeledVector: LabeledVector): >> LabeledVector = { >> + input.mapWithBcVariable(metrics){ >> + (labeledVector, metrics) => { >> + val (broadcastMean, broadcastStd) = metrics >> val LabeledVector(label, vector) = labeledVector >> var breezeVector = vector.asBreeze >> >> @@ -273,7 +252,7 @@ object StandardScaler { >> breezeVector = (breezeVector :* std) + mean >> LabeledVector(label, breezeVector.fromBreeze[Vector]) >> } >> - }).withBroadcastSet(metrics, "broadcastedMetrics") >> + } >> } >> >> case None => >> >> >