@Dmitriy Sir,
In the K means code above I think i am doing the following Incorrectly

Assigning the closest centriod index to the Row Keys of DRM

//11. Iterating over the Data Matrix(in DrmLike[Int] format) to calculate
the initial centriods
    dataDrmX.mapBlock() {
      case (keys, block) =>
        for (row <- 0 until block.nrow) {
          var dataPoint = block(row, ::)

          //12. findTheClosestCentriod find the closest centriod to the
Data point specified by "dataPoint"
          val closesetIndex = findTheClosestCentriod(dataPoint, centriods)

          //13. assigning closest index to key
          keys(row) = closesetIndex
        }
        keys -> block
    }

 in step 12 i am finding the centriod closest to the current dataPoint
 in step13 i am assigning the closesetIndex to the key of the corresponding
row represented by the dataPoint
I think i am doing step13 incorrectly.

Also i am unable to find the proper reference for the same in the reference
links which you have mentioned above


Thanks & Regards
Parth Khatwani





On Thu, Apr 13, 2017 at 6:24 PM, KHATWANI PARTH BHARAT <
h2016...@pilani.bits-pilani.ac.in> wrote:

> Dmitriy Sir,
> I have Created a github branch Github Branch Having Initial Kmeans Code
> <https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov>
>
>
> Thanks & Regards
> Parth Khatwani
>
> On Thu, Apr 13, 2017 at 3:19 AM, Andrew Palumbo <ap....@outlook.com>
> wrote:
>
>> +1 to creating a branch.
>>
>>
>>
>> Sent from my Verizon Wireless 4G LTE smartphone
>>
>>
>> -------- Original message --------
>> From: Dmitriy Lyubimov <dlie...@gmail.com>
>> Date: 04/12/2017 11:25 (GMT-08:00)
>> To: dev@mahout.apache.org
>> Subject: Re: Trying to write the KMeans Clustering Using "Apache Mahout
>> Samsara"
>>
>> can't say i can read this code well formatted that way...
>>
>> it would seem to me that the code is not using the broadcast variable and
>> instead is using closure variable. that's the only thing i can immediately
>> see by looking in the middle of it.
>>
>> it would be better if you created a branch on github for that code that
>> would allow for easy check-outs and comments.
>>
>> -d
>>
>> On Wed, Apr 12, 2017 at 10:29 AM, KHATWANI PARTH BHARAT <
>> h2016...@pilani.bits-pilani.ac.in> wrote:
>>
>> > @Dmitriy Sir
>> >
>> > I have completed the Kmeans code as per the algorithm you have Outline
>> > above
>> >
>> > My code is as follows
>> >
>> > This code works fine till step number 10
>> >
>> > In step 11 i am assigning the new centriod index  to corresponding row
>> key
>> > of data Point in the matrix
>> > I think i am doing something wrong in step 11 may be i am using
>> incorrect
>> > syntax
>> >
>> > Can you help me find out what am i doing wrong.
>> >
>> >
>> > //start of main method
>> >
>> > def main(args: Array[String]) {
>> >      //1. initialize the spark and mahout context
>> >     val conf = new SparkConf()
>> >       .setAppName("DRMExample")
>> >       .setMaster(args(0))
>> >       .set("spark.serializer", "org.apache.spark.serializer.
>> > KryoSerializer")
>> >       .set("spark.kryo.registrator",
>> > "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
>> >     implicit val sc = new SparkDistributedContext(new
>> SparkContext(conf))
>> >
>> >     //2. read the data file and save it in the rdd
>> >     val lines = sc.textFile(args(1))
>> >
>> >     //3. convert data read in as string in to array of double
>> >     val test = lines.map(line => line.split('\t').map(_.toDouble))
>> >
>> >     //4. add a column having value 1 in array of double this will
>> > create something like (1 | D)',  which will be used while calculating
>> > (1 | D)'
>> >     val augumentedArray = test.map(addCentriodColumn _)
>> >
>> >     //5. convert rdd of array of double in rdd of DenseVector
>> >     val rdd = augumentedArray.map(dvec(_))
>> >
>> >     //6. convert rdd to DrmRdd
>> >     val rddMatrixLike: DrmRdd[Int] = rdd.zipWithIndex.map { case (v,
>> > idx) => (idx.toInt, v) }        //7. convert DrmRdd to
>> > CheckpointedDrm[Int]    val matrix = drmWrap(rddMatrixLike)    //8.
>> > seperating the column having all ones created in step 4 and will use
>> > it later    val oneVector = matrix(::, 0 until 1)        //9. final
>> > input data in DrmLike[Int] format    val dataDrmX = matrix(::, 1 until
>> > 4)            //9. Sampling to select initial centriods    val
>> > centriods = drmSampleKRows(dataDrmX, 2, false)    centriods.size
>> > //10. Broad Casting the initial centriods    val broadCastMatrix =
>> > drmBroadcast(centriods)            //11. Iterating over the Data
>> > Matrix(in DrmLike[Int] format) to calculate the initial centriods
>> > dataDrmX.mapBlock() {      case (keys, block) =>        for (row <- 0
>> > until block.nrow) {          var dataPoint = block(row, ::)
>> >         //12. findTheClosestCentriod find the closest centriod to the
>> > Data point specified by "dataPoint"          val closesetIndex =
>> > findTheClosestCentriod(dataPoint, centriods)                    //13.
>> > assigning closest index to key          keys(row) = closesetIndex
>> >   }        keys -> block    }
>> >
>> >     //14. Calculating the (1|D)      val b = (oneVector cbind
>> > dataDrmX)        //15. Aggregating Transpose (1|D)'    val bTranspose
>> > = (oneVector cbind dataDrmX).t    // after step 15 bTranspose will
>> > have data in the following format        /*(n+1)*K where n=dimension
>> > of the data point, K=number of clusters    * zeroth row will contain
>> > the count of points assigned to each cluster    * assuming 3d data
>> > points     *     */
>> >
>> >
>> >     val nrows = b.nrow.toInt    //16. slicing the count vectors out
>> >  val pointCountVectors = drmBroadcast(b(0 until 1, ::).collect(0, ::))
>> >    val vectorSums = b(1 until nrows, ::)    //17. dividing the data
>> > point by count vector    vectorSums.mapBlock() {      case (keys,
>> > block) =>        for (row <- 0 until block.nrow) {          block(row,
>> > ::) /= pointCountVectors        }        keys -> block    }    //18.
>> > seperating the count vectors    val newCentriods = vectorSums.t(::,1
>> > until centriods.size)            //19. iterate over the above code
>> > till convergence criteria is meet   }//end of main method
>> >
>> >
>> >
>> >   // method to find the closest centriod to data point( vec: Vector
>> > in the arguments)  def findTheClosestCentriod(vec: Vector, matrix:
>> > Matrix): Int = {
>> >     var index = 0
>> >     var closest = Double.PositiveInfinity
>> >     for (row <- 0 until matrix.nrow) {
>> >       val squaredSum = ssr(vec, matrix(row, ::))
>> >       val tempDist = Math.sqrt(ssr(vec, matrix(row, ::)))
>> >       if (tempDist < closest) {
>> >         closest = tempDist
>> >         index = row
>> >       }
>> >     }
>> >     index
>> >   }
>> >
>> >    //calculating the sum of squared distance between the points(Vectors)
>> >   def ssr(a: Vector, b: Vector): Double = {
>> >     (a - b) ^= 2 sum
>> >   }
>> >
>> >   //method used to create (1|D)
>> >   def addCentriodColumn(arg: Array[Double]): Array[Double] = {
>> >     val newArr = new Array[Double](arg.length + 1)
>> >     newArr(0) = 1.0;
>> >     for (i <- 0 until (arg.size)) {
>> >       newArr(i + 1) = arg(i);
>> >     }
>> >     newArr
>> >   }
>> >
>> >
>> > Thanks & Regards
>> > Parth Khatwani
>> >
>> >
>> >
>> > On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT <
>> > h2016...@pilani.bits-pilani.ac.in> wrote:
>> >
>> > >
>> > > ---------- Forwarded message ----------
>> > > From: Dmitriy Lyubimov <dlie...@gmail.com>
>> > > Date: Fri, Mar 31, 2017 at 11:34 PM
>> > > Subject: Re: Trying to write the KMeans Clustering Using "Apache
>> Mahout
>> > > Samsara"
>> > > To: "dev@mahout.apache.org" <dev@mahout.apache.org>
>> > >
>> > >
>> > > ps1 this assumes row-wise construction of A based on training set of m
>> > > n-dimensional points.
>> > > ps2 since we are doing multiple passes over A it may make sense to
>> make
>> > > sure it is committed to spark cache (by using checkpoint api), if
>> spark
>> > is
>> > > used
>> > >
>> > > On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy Lyubimov <dlie...@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > here is the outline. For details of APIs, please refer to samsara
>> > manual
>> > > > [2], i will not be be repeating it.
>> > > >
>> > > > Assume your training data input is m x n matrix A. For simplicity
>> let's
>> > > > assume it's a DRM with int row keys, i.e., DrmLike[Int].
>> > > >
>> > > > Initialization:
>> > > >
>> > > > First, classic k-means starts by selecting initial clusters, by
>> > sampling
>> > > > them out. You can do that by using sampling api [1], thus forming a
>> k
>> > x n
>> > > > in-memory matrix C (current centroids). C is therefore of Mahout's
>> > Matrix
>> > > > type.
>> > > >
>> > > > You the proceed by alternating between cluster assignments and
>> > > > recompupting centroid matrix C till convergence based on some test
>> or
>> > > > simply limited by epoch count budget, your choice.
>> > > >
>> > > > Cluster assignments: here, we go over current generation of A and
>> > > > recompute centroid indexes for each row in A. Once we recompute
>> index,
>> > we
>> > > > put it into the row key . You can do that by assigning centroid
>> indices
>> > > to
>> > > > keys of A using operator mapblock() (details in [2], [3], [4]). You
>> > also
>> > > > need to broadcast C in order to be able to access it in efficient
>> > manner
>> > > > inside mapblock() closure. Examples of that are plenty given in [2].
>> > > > Essentially, in mapblock, you'd reform the row keys to reflect
>> cluster
>> > > > index in C. while going over A, you'd have a "nearest neighbor"
>> problem
>> > > to
>> > > > solve for the row of A and centroids C. This is the bulk of
>> computation
>> > > > really, and there are a few tricks there that can speed this step
>> up in
>> > > > both exact and approximate manner, but you can start with a naive
>> > search.
>> > > >
>> > > > Centroid recomputation:
>> > > > once you assigned centroids to the keys of marix A, you'd want to
>> do an
>> > > > aggregating transpose of A to compute essentially average of row A
>> > > grouped
>> > > > by the centroid key. The trick is to do a computation of (1|A)'
>> which
>> > > will
>> > > > results in a matrix of the shape (Counts/sums of cluster rows).
>> This is
>> > > the
>> > > > part i find difficult to explain without a latex graphics.
>> > > >
>> > > > In Samsara, construction of (1|A)' corresponds to DRM expression
>> > > >
>> > > > (1 cbind A).t (again, see [2]).
>> > > >
>> > > > So when you compute, say,
>> > > >
>> > > > B = (1 | A)',
>> > > >
>> > > > then B is (n+1) x k, so each column contains a vector corresponding
>> to
>> > a
>> > > > cluster 1..k. In such column, the first element would be # of
>> points in
>> > > the
>> > > > cluster, and the rest of it would correspond to sum of all points.
>> So
>> > in
>> > > > order to arrive to an updated matrix C, we need to collect B into
>> > memory,
>> > > > and slice out counters (first row) from the rest of it.
>> > > >
>> > > > So, to compute C:
>> > > >
>> > > > C <- B (2:,:) each row divided by B(1,:)
>> > > >
>> > > > (watch out for empty clusters with 0 elements, this will cause lack
>> of
>> > > > convergence and NaNs in the newly computed C).
>> > > >
>> > > > This operation obviously uses subblocking and row-wise iteration
>> over
>> > B,
>> > > > for which i am again making reference to [2].
>> > > >
>> > > >
>> > > > [1] https://github.com/apache/mahout/blob/master/math-scala/
>> > > > src/main/scala/org/apache/mahout/math/drm/package.scala#L149
>> > > >
>> > > > [2], Sasmara manual, a bit dated but viable, http://apache.github.
>> > > > io/mahout/doc/ScalaSparkBindings.html
>> > > >
>> > > > [3] scaladoc, again, dated but largely viable for the purpose of
>> this
>> > > > exercise:
>> > > > http://apache.github.io/mahout/0.10.1/docs/mahout-math-
>> scala/index.htm
>> > > >
>> > > > [4] mapblock etc. http://apache.github.io/mahout
>> /0.10.1/docs/mahout-
>> > > > math-scala/index.html#org.apache.mahout.math.drm.RLikeDrmOps
>> > > >
>> > > > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH BHARAT <
>> > > > h2016...@pilani.bits-pilani.ac.in> wrote:
>> > > >
>> > > >> @Dmitriycan you please again tell me the approach to move ahead.
>> > > >>
>> > > >>
>> > > >> Thanks
>> > > >> Parth Khatwani
>> > > >>
>> > > >>
>> > > >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH BHARAT <
>> > > >> h2016...@pilani.bits-pilani.ac.in> wrote:
>> > > >>
>> > > >> > yes i am unable to figure out the way ahead.
>> > > >> > Like how to create the augmented matrix A := (0|D) which you have
>> > > >> > mentioned.
>> > > >> >
>> > > >> >
>> > > >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy Lyubimov <
>> > dlie...@gmail.com
>> > > >
>> > > >> > wrote:
>> > > >> >
>> > > >> >> was my reply for your post on @user has been a bit confusing?
>> > > >> >>
>> > > >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH BHARAT <
>> > > >> >> h2016...@pilani.bits-pilani.ac.in> wrote:
>> > > >> >>
>> > > >> >> > Sir,
>> > > >> >> > I am trying to write the kmeans clustering algorithm using
>> Mahout
>> > > >> >> Samsara
>> > > >> >> > but i am bit confused
>> > > >> >> > about how to leverage Distributed Row Matrix for the same. Can
>> > > >> anybody
>> > > >> >> help
>> > > >> >> > me with same.
>> > > >> >> >
>> > > >> >> >
>> > > >> >> >
>> > > >> >> >
>> > > >> >> >
>> > > >> >> > Thanks
>> > > >> >> > Parth Khatwani
>> > > >> >> >
>> > > >> >>
>> > > >> >
>> > > >> >
>> > > >>
>> > > >
>> > > >
>> > >
>> > >
>> >
>>
>
>

Reply via email to