i would think reassinging keys should work in most cases.
The only exception is that technically Spark contracts imply that effect
should be idempotent if task is retried, which might be a problem in a
specific scenario of the object tree coming out from block cache object
tree, which can stay there and be retried again. but specifically w.r.t.
this key assignment i don't see any problem since the action obviously
would be idempotent even if this code is run multiple times on the same
(key, block) pair. This part should be good IMO.

On Fri, Apr 14, 2017 at 2:26 AM, KHATWANI PARTH BHARAT <
h2016...@pilani.bits-pilani.ac.in> wrote:

> @Dmitriy Sir,
> In the K means code above I think i am doing the following Incorrectly
>
> Assigning the closest centriod index to the Row Keys of DRM
>
> //11. Iterating over the Data Matrix(in DrmLike[Int] format) to calculate
> the initial centriods
>     dataDrmX.mapBlock() {
>       case (keys, block) =>
>         for (row <- 0 until block.nrow) {
>           var dataPoint = block(row, ::)
>
>           //12. findTheClosestCentriod find the closest centriod to the
> Data point specified by "dataPoint"
>           val closesetIndex = findTheClosestCentriod(dataPoint, centriods)
>
>           //13. assigning closest index to key
>           keys(row) = closesetIndex
>         }
>         keys -> block
>     }
>
>  in step 12 i am finding the centriod closest to the current dataPoint
>  in step13 i am assigning the closesetIndex to the key of the corresponding
> row represented by the dataPoint
> I think i am doing step13 incorrectly.
>
> Also i am unable to find the proper reference for the same in the reference
> links which you have mentioned above
>
>
> Thanks & Regards
> Parth Khatwani
>
>
>
>
>
> On Thu, Apr 13, 2017 at 6:24 PM, KHATWANI PARTH BHARAT <
> h2016...@pilani.bits-pilani.ac.in> wrote:
>
> > Dmitriy Sir,
> > I have Created a github branch Github Branch Having Initial Kmeans Code
> > <https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov>
> >
> >
> > Thanks & Regards
> > Parth Khatwani
> >
> > On Thu, Apr 13, 2017 at 3:19 AM, Andrew Palumbo <ap....@outlook.com>
> > wrote:
> >
> >> +1 to creating a branch.
> >>
> >>
> >>
> >> Sent from my Verizon Wireless 4G LTE smartphone
> >>
> >>
> >> -------- Original message --------
> >> From: Dmitriy Lyubimov <dlie...@gmail.com>
> >> Date: 04/12/2017 11:25 (GMT-08:00)
> >> To: dev@mahout.apache.org
> >> Subject: Re: Trying to write the KMeans Clustering Using "Apache Mahout
> >> Samsara"
> >>
> >> can't say i can read this code well formatted that way...
> >>
> >> it would seem to me that the code is not using the broadcast variable
> and
> >> instead is using closure variable. that's the only thing i can
> immediately
> >> see by looking in the middle of it.
> >>
> >> it would be better if you created a branch on github for that code that
> >> would allow for easy check-outs and comments.
> >>
> >> -d
> >>
> >> On Wed, Apr 12, 2017 at 10:29 AM, KHATWANI PARTH BHARAT <
> >> h2016...@pilani.bits-pilani.ac.in> wrote:
> >>
> >> > @Dmitriy Sir
> >> >
> >> > I have completed the Kmeans code as per the algorithm you have Outline
> >> > above
> >> >
> >> > My code is as follows
> >> >
> >> > This code works fine till step number 10
> >> >
> >> > In step 11 i am assigning the new centriod index  to corresponding row
> >> key
> >> > of data Point in the matrix
> >> > I think i am doing something wrong in step 11 may be i am using
> >> incorrect
> >> > syntax
> >> >
> >> > Can you help me find out what am i doing wrong.
> >> >
> >> >
> >> > //start of main method
> >> >
> >> > def main(args: Array[String]) {
> >> >      //1. initialize the spark and mahout context
> >> >     val conf = new SparkConf()
> >> >       .setAppName("DRMExample")
> >> >       .setMaster(args(0))
> >> >       .set("spark.serializer", "org.apache.spark.serializer.
> >> > KryoSerializer")
> >> >       .set("spark.kryo.registrator",
> >> > "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
> >> >     implicit val sc = new SparkDistributedContext(new
> >> SparkContext(conf))
> >> >
> >> >     //2. read the data file and save it in the rdd
> >> >     val lines = sc.textFile(args(1))
> >> >
> >> >     //3. convert data read in as string in to array of double
> >> >     val test = lines.map(line => line.split('\t').map(_.toDouble))
> >> >
> >> >     //4. add a column having value 1 in array of double this will
> >> > create something like (1 | D)',  which will be used while calculating
> >> > (1 | D)'
> >> >     val augumentedArray = test.map(addCentriodColumn _)
> >> >
> >> >     //5. convert rdd of array of double in rdd of DenseVector
> >> >     val rdd = augumentedArray.map(dvec(_))
> >> >
> >> >     //6. convert rdd to DrmRdd
> >> >     val rddMatrixLike: DrmRdd[Int] = rdd.zipWithIndex.map { case (v,
> >> > idx) => (idx.toInt, v) }        //7. convert DrmRdd to
> >> > CheckpointedDrm[Int]    val matrix = drmWrap(rddMatrixLike)    //8.
> >> > seperating the column having all ones created in step 4 and will use
> >> > it later    val oneVector = matrix(::, 0 until 1)        //9. final
> >> > input data in DrmLike[Int] format    val dataDrmX = matrix(::, 1 until
> >> > 4)            //9. Sampling to select initial centriods    val
> >> > centriods = drmSampleKRows(dataDrmX, 2, false)    centriods.size
> >> > //10. Broad Casting the initial centriods    val broadCastMatrix =
> >> > drmBroadcast(centriods)            //11. Iterating over the Data
> >> > Matrix(in DrmLike[Int] format) to calculate the initial centriods
> >> > dataDrmX.mapBlock() {      case (keys, block) =>        for (row <- 0
> >> > until block.nrow) {          var dataPoint = block(row, ::)
> >> >         //12. findTheClosestCentriod find the closest centriod to the
> >> > Data point specified by "dataPoint"          val closesetIndex =
> >> > findTheClosestCentriod(dataPoint, centriods)                    //13.
> >> > assigning closest index to key          keys(row) = closesetIndex
> >> >   }        keys -> block    }
> >> >
> >> >     //14. Calculating the (1|D)      val b = (oneVector cbind
> >> > dataDrmX)        //15. Aggregating Transpose (1|D)'    val bTranspose
> >> > = (oneVector cbind dataDrmX).t    // after step 15 bTranspose will
> >> > have data in the following format        /*(n+1)*K where n=dimension
> >> > of the data point, K=number of clusters    * zeroth row will contain
> >> > the count of points assigned to each cluster    * assuming 3d data
> >> > points     *     */
> >> >
> >> >
> >> >     val nrows = b.nrow.toInt    //16. slicing the count vectors out
> >> >  val pointCountVectors = drmBroadcast(b(0 until 1, ::).collect(0, ::))
> >> >    val vectorSums = b(1 until nrows, ::)    //17. dividing the data
> >> > point by count vector    vectorSums.mapBlock() {      case (keys,
> >> > block) =>        for (row <- 0 until block.nrow) {          block(row,
> >> > ::) /= pointCountVectors        }        keys -> block    }    //18.
> >> > seperating the count vectors    val newCentriods = vectorSums.t(::,1
> >> > until centriods.size)            //19. iterate over the above code
> >> > till convergence criteria is meet   }//end of main method
> >> >
> >> >
> >> >
> >> >   // method to find the closest centriod to data point( vec: Vector
> >> > in the arguments)  def findTheClosestCentriod(vec: Vector, matrix:
> >> > Matrix): Int = {
> >> >     var index = 0
> >> >     var closest = Double.PositiveInfinity
> >> >     for (row <- 0 until matrix.nrow) {
> >> >       val squaredSum = ssr(vec, matrix(row, ::))
> >> >       val tempDist = Math.sqrt(ssr(vec, matrix(row, ::)))
> >> >       if (tempDist < closest) {
> >> >         closest = tempDist
> >> >         index = row
> >> >       }
> >> >     }
> >> >     index
> >> >   }
> >> >
> >> >    //calculating the sum of squared distance between the
> points(Vectors)
> >> >   def ssr(a: Vector, b: Vector): Double = {
> >> >     (a - b) ^= 2 sum
> >> >   }
> >> >
> >> >   //method used to create (1|D)
> >> >   def addCentriodColumn(arg: Array[Double]): Array[Double] = {
> >> >     val newArr = new Array[Double](arg.length + 1)
> >> >     newArr(0) = 1.0;
> >> >     for (i <- 0 until (arg.size)) {
> >> >       newArr(i + 1) = arg(i);
> >> >     }
> >> >     newArr
> >> >   }
> >> >
> >> >
> >> > Thanks & Regards
> >> > Parth Khatwani
> >> >
> >> >
> >> >
> >> > On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT <
> >> > h2016...@pilani.bits-pilani.ac.in> wrote:
> >> >
> >> > >
> >> > > ---------- Forwarded message ----------
> >> > > From: Dmitriy Lyubimov <dlie...@gmail.com>
> >> > > Date: Fri, Mar 31, 2017 at 11:34 PM
> >> > > Subject: Re: Trying to write the KMeans Clustering Using "Apache
> >> Mahout
> >> > > Samsara"
> >> > > To: "dev@mahout.apache.org" <dev@mahout.apache.org>
> >> > >
> >> > >
> >> > > ps1 this assumes row-wise construction of A based on training set
> of m
> >> > > n-dimensional points.
> >> > > ps2 since we are doing multiple passes over A it may make sense to
> >> make
> >> > > sure it is committed to spark cache (by using checkpoint api), if
> >> spark
> >> > is
> >> > > used
> >> > >
> >> > > On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy Lyubimov <
> dlie...@gmail.com
> >> >
> >> > > wrote:
> >> > >
> >> > > > here is the outline. For details of APIs, please refer to samsara
> >> > manual
> >> > > > [2], i will not be be repeating it.
> >> > > >
> >> > > > Assume your training data input is m x n matrix A. For simplicity
> >> let's
> >> > > > assume it's a DRM with int row keys, i.e., DrmLike[Int].
> >> > > >
> >> > > > Initialization:
> >> > > >
> >> > > > First, classic k-means starts by selecting initial clusters, by
> >> > sampling
> >> > > > them out. You can do that by using sampling api [1], thus forming
> a
> >> k
> >> > x n
> >> > > > in-memory matrix C (current centroids). C is therefore of Mahout's
> >> > Matrix
> >> > > > type.
> >> > > >
> >> > > > You the proceed by alternating between cluster assignments and
> >> > > > recompupting centroid matrix C till convergence based on some test
> >> or
> >> > > > simply limited by epoch count budget, your choice.
> >> > > >
> >> > > > Cluster assignments: here, we go over current generation of A and
> >> > > > recompute centroid indexes for each row in A. Once we recompute
> >> index,
> >> > we
> >> > > > put it into the row key . You can do that by assigning centroid
> >> indices
> >> > > to
> >> > > > keys of A using operator mapblock() (details in [2], [3], [4]).
> You
> >> > also
> >> > > > need to broadcast C in order to be able to access it in efficient
> >> > manner
> >> > > > inside mapblock() closure. Examples of that are plenty given in
> [2].
> >> > > > Essentially, in mapblock, you'd reform the row keys to reflect
> >> cluster
> >> > > > index in C. while going over A, you'd have a "nearest neighbor"
> >> problem
> >> > > to
> >> > > > solve for the row of A and centroids C. This is the bulk of
> >> computation
> >> > > > really, and there are a few tricks there that can speed this step
> >> up in
> >> > > > both exact and approximate manner, but you can start with a naive
> >> > search.
> >> > > >
> >> > > > Centroid recomputation:
> >> > > > once you assigned centroids to the keys of marix A, you'd want to
> >> do an
> >> > > > aggregating transpose of A to compute essentially average of row A
> >> > > grouped
> >> > > > by the centroid key. The trick is to do a computation of (1|A)'
> >> which
> >> > > will
> >> > > > results in a matrix of the shape (Counts/sums of cluster rows).
> >> This is
> >> > > the
> >> > > > part i find difficult to explain without a latex graphics.
> >> > > >
> >> > > > In Samsara, construction of (1|A)' corresponds to DRM expression
> >> > > >
> >> > > > (1 cbind A).t (again, see [2]).
> >> > > >
> >> > > > So when you compute, say,
> >> > > >
> >> > > > B = (1 | A)',
> >> > > >
> >> > > > then B is (n+1) x k, so each column contains a vector
> corresponding
> >> to
> >> > a
> >> > > > cluster 1..k. In such column, the first element would be # of
> >> points in
> >> > > the
> >> > > > cluster, and the rest of it would correspond to sum of all points.
> >> So
> >> > in
> >> > > > order to arrive to an updated matrix C, we need to collect B into
> >> > memory,
> >> > > > and slice out counters (first row) from the rest of it.
> >> > > >
> >> > > > So, to compute C:
> >> > > >
> >> > > > C <- B (2:,:) each row divided by B(1,:)
> >> > > >
> >> > > > (watch out for empty clusters with 0 elements, this will cause
> lack
> >> of
> >> > > > convergence and NaNs in the newly computed C).
> >> > > >
> >> > > > This operation obviously uses subblocking and row-wise iteration
> >> over
> >> > B,
> >> > > > for which i am again making reference to [2].
> >> > > >
> >> > > >
> >> > > > [1] https://github.com/apache/mahout/blob/master/math-scala/
> >> > > > src/main/scala/org/apache/mahout/math/drm/package.scala#L149
> >> > > >
> >> > > > [2], Sasmara manual, a bit dated but viable, http://apache.github
> .
> >> > > > io/mahout/doc/ScalaSparkBindings.html
> >> > > >
> >> > > > [3] scaladoc, again, dated but largely viable for the purpose of
> >> this
> >> > > > exercise:
> >> > > > http://apache.github.io/mahout/0.10.1/docs/mahout-math-
> >> scala/index.htm
> >> > > >
> >> > > > [4] mapblock etc. http://apache.github.io/mahout
> >> /0.10.1/docs/mahout-
> >> > > > math-scala/index.html#org.apache.mahout.math.drm.RLikeDrmOps
> >> > > >
> >> > > > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH BHARAT <
> >> > > > h2016...@pilani.bits-pilani.ac.in> wrote:
> >> > > >
> >> > > >> @Dmitriycan you please again tell me the approach to move ahead.
> >> > > >>
> >> > > >>
> >> > > >> Thanks
> >> > > >> Parth Khatwani
> >> > > >>
> >> > > >>
> >> > > >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH BHARAT <
> >> > > >> h2016...@pilani.bits-pilani.ac.in> wrote:
> >> > > >>
> >> > > >> > yes i am unable to figure out the way ahead.
> >> > > >> > Like how to create the augmented matrix A := (0|D) which you
> have
> >> > > >> > mentioned.
> >> > > >> >
> >> > > >> >
> >> > > >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy Lyubimov <
> >> > dlie...@gmail.com
> >> > > >
> >> > > >> > wrote:
> >> > > >> >
> >> > > >> >> was my reply for your post on @user has been a bit confusing?
> >> > > >> >>
> >> > > >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH BHARAT <
> >> > > >> >> h2016...@pilani.bits-pilani.ac.in> wrote:
> >> > > >> >>
> >> > > >> >> > Sir,
> >> > > >> >> > I am trying to write the kmeans clustering algorithm using
> >> Mahout
> >> > > >> >> Samsara
> >> > > >> >> > but i am bit confused
> >> > > >> >> > about how to leverage Distributed Row Matrix for the same.
> Can
> >> > > >> anybody
> >> > > >> >> help
> >> > > >> >> > me with same.
> >> > > >> >> >
> >> > > >> >> >
> >> > > >> >> >
> >> > > >> >> >
> >> > > >> >> >
> >> > > >> >> > Thanks
> >> > > >> >> > Parth Khatwani
> >> > > >> >> >
> >> > > >> >>
> >> > > >> >
> >> > > >> >
> >> > > >>
> >> > > >
> >> > > >
> >> > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to