@Trevor, Following is the link for the Github Branch For the Kmeans code and Code for the sample Program(which we are discussing above) which i am using to figure what am i doing wrong in the Kmeans code using https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov
Thanks & Regards Parth On Sat, Apr 22, 2017 at 1:26 AM, KHATWANI PARTH BHARAT < h2016...@pilani.bits-pilani.ac.in> wrote: > @Trevor > > > > In was trying to write the "*Kmeans*" Using Mahout DRM as per the > algorithm outlined by Dmitriy. > I was facing the Problem of assigning cluster Ids to the Row Keys > For Example > Consider the below matrix Where column 1 to 3 are the data points and > column 0 Containing the count of the point > { > 0 => {0:1.0, 1: 1.0, 2: 1.0, 3: 3.0} > 1 => {0:1.0, 1: 2.0, 2: 3.0, 3: 4.0} > 2 => {0:1.0, 1: 3.0, 2: 4.0, 3: 5.0} > 3 => {0:1.0, 1: 4.0, 2: 5.0, 3: 6.0} > } > > now after calculating the centriod which closest to the data point data > zeroth index i am trying to assign the centriod index to *row key * > > Now Suppose say that every data point is assigned to centriod at index 1 > so after assigning the key=1 to each and every row > > using the code below > > val drm2 = A.mapBlock() { > case (keys, block) => for(row <- 0 until keys.size) { > > * //assigning 1 to each row index* keys(row) = 1 } > (keys, block) } > > > > I want above matrix to be in this form > > > { > 1 => {0:1.0, 1: 1.0, 2: 1.0, 3: 3.0} > 1 => {0:1.0, 1: 2.0, 2: 3.0, 3: 4.0} > 1 => {0:1.0, 1: 3.0, 2: 4.0, 3: 5.0} > 1 => {0:1.0, 1: 4.0, 2: 5.0, 3: 6.0} > } > > > > > Turns out to be this > { > 0 => {} > 1 => {0:1.0,1:4.0,2:5.0,3:6.0} > 2 => {} > 3 => {} > } > > > > I am confused weather assigning the new Key Values to the row index is > done through the following code line > > * //assigning 1 to each row index* keys(row) = 1 > > > or is there any other way. > > > > I am not able to find any use links or reference on internet even Andrew > and Dmitriy's book also does not have any proper reference for the > above mentioned issue. > > > > Thanks & Regards > Parth Khatwani > > > > On Fri, Apr 21, 2017 at 10:06 PM, Trevor Grant <trevor.d.gr...@gmail.com> > wrote: > >> OK, i dug into this before i read your question carefully, that was my >> bad. >> >> Assuming you want the aggregate transpose of : >> { >> 0 => {0:1.0, 1: 1.0, 2: 1.0, 3: 3.0} >> 1 => {0:1.0, 1: 2.0, 2: 3.0, 3: 4.0} >> 2 => {0:1.0, 1: 3.0, 2: 4.0, 3: 5.0} >> 3 => {0:1.0, 1: 4.0, 2: 5.0, 3: 6.0} >> } >> >> to be >> { >> 0 => {1: 5.0} // (not 4.0) // and 6.0 in your example... >> 1 => {1: 9.0} >> 2 => {1: 12.0} >> 3 => {1: 15.0} >> } >> >> >> Then why not replace the mapBlock statement as follows: >> >> val drm2 = (A(::, 1 until 4) cbind 0.0).mapBlock() { >> case (keys, block) => >> for(row <- 0 until block.nrow) block(row, 3) = block(row, ::).sum >> (keys, block) >> } >> val aggTranspose = drm2(::, 3 until 4).t >> println("Result of aggregating tranpose") >> println(""+aggTranspose.collect) >> >> Where we are creating an empty row, then filling it with the row sums. >> >> A distributed rowSums fn would be nice for just such an occasion... sigh >> >> Let me know if that gets you going again. That was simpler than I >> thought- >> sorry for delay on this. >> >> PS >> Candidly, I didn't explore further once i understood teh question, but if >> you are going to collect this to the driver anyway (not sure if that is >> the >> case) >> A(::, 1 until 4).rowSums would also work. >> >> >> >> >> >> Trevor Grant >> Data Scientist >> https://github.com/rawkintrevo >> http://stackexchange.com/users/3002022/rawkintrevo >> http://trevorgrant.org >> >> *"Fortunate is he, who is able to know the causes of things." -Virgil* >> >> >> On Thu, Apr 20, 2017 at 9:01 PM, KHATWANI PARTH BHARAT < >> h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > @Trevor Sir, >> > I have attached the sample data file and here is the line to complete >> the Data >> > File <https://drive.google.com/open?id=0Bxnnu_Ig2Et9QjZoM3dmY1V5WXM>. >> > >> > >> > Following is the link for the Github Branch For the code >> > https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov >> > >> > KmeansMahout.scala >> > <https://github.com/parth2691/Spark_Mahout/blob/Dmitriy-Lyub >> imov/KmeansMahout.scala> is >> > the complete code >> > >> > >> > I also have made sample program just to test the assigning new values to >> > the key to Row Matrix and aggregating transpose.I think assigning new >> > values to the key to Row Matrix and aggregating transpose is causing the >> > main problem in Kmean code >> > Following is the link to Github repo for this code. >> > TestClusterAssign.scala >> > <https://github.com/parth2691/Spark_Mahout/blob/Dmitriy-Lyub >> imov/TestClusterAssign.scala> >> > >> > above code contains the hard coded data. Following is the expected and >> the >> > actual output of the above code >> > Out of 1st println After New Cluster assignment should be >> > This >> > { >> > 0 => {0:1.0, 1: 1.0, 2: 1.0, 3: 3.0} >> > 1 => {0:1.0, 1: 2.0, 2: 3.0, 3: 4.0} >> > 2 => {0:1.0, 1: 3.0, 2: 4.0, 3: 5.0} >> > 3 => {0:1.0, 1: 4.0, 2: 5.0, 3: 6.0} >> > } >> > (Here zeroth Column is used to store the centriod count and column 1,2 >> and >> > 3 Contains Data) >> > >> > But Turns out to be this >> > { >> > 0 => {} >> > 1 => {0:1.0,1:4.0,2:5.0,3:6.0} >> > 2 => {} >> > 3 => {} >> > } >> > And the result of aggregating Transpose should be >> > { >> > 0 => {1: 4.0} >> > 1 => {1: 9.0} >> > 2 => {1: 12.0} >> > 3 => {1: 15.0} >> > } >> > >> > >> > Thanks Trevor for such a great Help >> > >> > >> > >> > >> > Best Regards >> > Parth >> > >> > >> > >> > >> > >> > >> > >> > >> > On Fri, Apr 21, 2017 at 4:20 AM, Trevor Grant <trevor.d.gr...@gmail.com >> > >> > wrote: >> > >> >> Hey >> >> >> >> Sorry for delay- was getting ready to tear into this. >> >> >> >> Would you mind posting a small sample of data that you would expect >> this >> >> application to consume. >> >> >> >> tg >> >> >> >> >> >> Trevor Grant >> >> Data Scientist >> >> https://github.com/rawkintrevo >> >> http://stackexchange.com/users/3002022/rawkintrevo >> >> http://trevorgrant.org >> >> >> >> *"Fortunate is he, who is able to know the causes of things." -Virgil* >> >> >> >> >> >> On Tue, Apr 18, 2017 at 11:32 PM, KHATWANI PARTH BHARAT < >> >> h2016...@pilani.bits-pilani.ac.in> wrote: >> >> >> >> > @Dmitriy,@Trevor and @Andrew Sir, >> >> > I am still stuck at the above problem can you please help me out with >> >> it. >> >> > I am unable to find the proper reference to solve the above issue. >> >> > >> >> > Thanks & Regards >> >> > Parth Khatwani >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > <https://mailtrack.io/> Sent with Mailtrack >> >> > <https://mailtrack.io/install?source=signature&lang=en& >> >> > referral=h2016...@pilani.bits-pilani.ac.in&idSignature=22> >> >> > >> >> > On Sat, Apr 15, 2017 at 10:07 AM, KHATWANI PARTH BHARAT < >> >> > h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > >> >> > > @Dmitriy, >> >> > > @Trevor and @Andrew >> >> > > >> >> > > I have tried >> >> > > Testing this Row Key assignment issue which i have mentioned in the >> >> above >> >> > > mail, >> >> > > By Writing the a separate code where i am assigning the a default >> >> value 1 >> >> > > to each row Key of The DRM and then taking the aggregating >> transpose >> >> > > I have committed the separate test code to the Github Branch >> >> > > <https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov>. >> >> > > >> >> > > The Code is as follows >> >> > > >> >> > > val inCoreA = dense((1,1, 2, 3), (1,2, 3, 4), (1,3, 4, 5), (1,4, 5, >> >> 6)) >> >> > > val A = drmParallelize(m = inCoreA) >> >> > > >> >> > > //Mapblock >> >> > > val drm2 = A.mapBlock() { >> >> > > case (keys, block) => for(row <- 0 until keys.size) { >> >> > > >> >> > > * //assigning 1 to each row index* keys(row) = 1 >> >> > } (keys, block) } prinln("After New Cluster >> assignment") >> >> > println(""+drm2.collect) val aggTranspose = drm2.t >> >> println("Result of >> >> > aggregating tranpose") println(""+aggTranspose.collect) >> >> > > >> >> > > Out of 1st println After New Cluster assignment should be >> >> > > This >> >> > > { >> >> > > 0 => {0:1.0, 1: 1.0, 2: 1.0, 3: 3.0} >> >> > > 1 => {0:1.0, 1: 2.0, 2: 3.0, 3: 4.0} >> >> > > 2 => {0:1.0, 1: 3.0, 2: 4.0, 3: 5.0} >> >> > > 3 => {0:1.0, 1: 4.0, 2: 5.0, 3: 6.0} >> >> > > } >> >> > > (Here zeroth Column is used to store the centriod count and column >> 1,2 >> >> > and >> >> > > 3 Contains Data) >> >> > > >> >> > > But Turns out to be this >> >> > > { >> >> > > 0 => {} >> >> > > 1 => {0:1.0,1:4.0,2:5.0,3:6.0} >> >> > > 2 => {} >> >> > > 3 => {} >> >> > > } >> >> > > And the result of aggregating Transpose should be >> >> > > { >> >> > > 0 => {1: 4.0} >> >> > > 1 => {1: 9.0} >> >> > > 2 => {1: 12.0} >> >> > > 3 => {1: 15.0} >> >> > > } >> >> > > >> >> > > >> >> > > I have referred to the book written by Andrew And Dmitriy Apache >> >> Mahout: >> >> > > Beyond MapReduce >> >> > > <https://www.amazon.com/Apache-Mahout-MapReduce- >> >> > Dmitriy-Lyubimov/dp/1523775785> Aggregating >> >> > > Transpose and other concepts are explained very nicely over here >> but >> >> i >> >> > am >> >> > > unable to find any example where >> >> > > Row Keys are assigned new Values . Mahout Samsara Manual >> >> > > http://apache.github.io/mahout/doc/ScalaSparkBindings.html Also >> Does >> >> not >> >> > > contain any such examples. >> >> > > It will great if i can get some reference to solution of mentioned >> >> issue. >> >> > > >> >> > > >> >> > > Thanks >> >> > > Parth Khatwani >> >> > > >> >> > > >> >> > > >> >> > > On Sat, Apr 15, 2017 at 12:13 AM, Andrew Palumbo < >> ap....@outlook.com> >> >> > > wrote: >> >> > > >> >> > >> +1 >> >> > >> >> >> > >> >> >> > >> >> >> > >> Sent from my Verizon Wireless 4G LTE smartphone >> >> > >> >> >> > >> >> >> > >> -------- Original message -------- >> >> > >> From: Trevor Grant <trevor.d.gr...@gmail.com> >> >> > >> Date: 04/14/2017 11:40 (GMT-08:00) >> >> > >> To: dev@mahout.apache.org >> >> > >> Subject: Re: Trying to write the KMeans Clustering Using "Apache >> >> Mahout >> >> > >> Samsara" >> >> > >> >> >> > >> Parth and Dmitriy, >> >> > >> >> >> > >> This is awesome- as a follow on can we work on getting this rolled >> >> in to >> >> > >> the algorithms framework? >> >> > >> >> >> > >> Happy to work with you on this Parth! >> >> > >> >> >> > >> Trevor Grant >> >> > >> Data Scientist >> >> > >> https://github.com/rawkintrevo >> >> > >> http://stackexchange.com/users/3002022/rawkintrevo >> >> > >> http://trevorgrant.org >> >> > >> >> >> > >> *"Fortunate is he, who is able to know the causes of things." >> >> -Virgil* >> >> > >> >> >> > >> >> >> > >> On Fri, Apr 14, 2017 at 1:27 PM, Dmitriy Lyubimov < >> dlie...@gmail.com >> >> > >> >> > >> wrote: >> >> > >> >> >> > >> > i would think reassinging keys should work in most cases. >> >> > >> > The only exception is that technically Spark contracts imply >> that >> >> > effect >> >> > >> > should be idempotent if task is retried, which might be a >> problem >> >> in a >> >> > >> > specific scenario of the object tree coming out from block cache >> >> > object >> >> > >> > tree, which can stay there and be retried again. but >> specifically >> >> > w.r.t. >> >> > >> > this key assignment i don't see any problem since the action >> >> obviously >> >> > >> > would be idempotent even if this code is run multiple times on >> the >> >> > same >> >> > >> > (key, block) pair. This part should be good IMO. >> >> > >> > >> >> > >> > On Fri, Apr 14, 2017 at 2:26 AM, KHATWANI PARTH BHARAT < >> >> > >> > h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > >> > >> >> > >> > > @Dmitriy Sir, >> >> > >> > > In the K means code above I think i am doing the following >> >> > Incorrectly >> >> > >> > > >> >> > >> > > Assigning the closest centriod index to the Row Keys of DRM >> >> > >> > > >> >> > >> > > //11. Iterating over the Data Matrix(in DrmLike[Int] format) >> to >> >> > >> calculate >> >> > >> > > the initial centriods >> >> > >> > > dataDrmX.mapBlock() { >> >> > >> > > case (keys, block) => >> >> > >> > > for (row <- 0 until block.nrow) { >> >> > >> > > var dataPoint = block(row, ::) >> >> > >> > > >> >> > >> > > //12. findTheClosestCentriod find the closest >> centriod >> >> to >> >> > >> the >> >> > >> > > Data point specified by "dataPoint" >> >> > >> > > val closesetIndex = findTheClosestCentriod(dataPoi >> nt, >> >> > >> > centriods) >> >> > >> > > >> >> > >> > > //13. assigning closest index to key >> >> > >> > > keys(row) = closesetIndex >> >> > >> > > } >> >> > >> > > keys -> block >> >> > >> > > } >> >> > >> > > >> >> > >> > > in step 12 i am finding the centriod closest to the current >> >> > dataPoint >> >> > >> > > in step13 i am assigning the closesetIndex to the key of the >> >> > >> > corresponding >> >> > >> > > row represented by the dataPoint >> >> > >> > > I think i am doing step13 incorrectly. >> >> > >> > > >> >> > >> > > Also i am unable to find the proper reference for the same in >> the >> >> > >> > reference >> >> > >> > > links which you have mentioned above >> >> > >> > > >> >> > >> > > >> >> > >> > > Thanks & Regards >> >> > >> > > Parth Khatwani >> >> > >> > > >> >> > >> > > >> >> > >> > > >> >> > >> > > >> >> > >> > > >> >> > >> > > On Thu, Apr 13, 2017 at 6:24 PM, KHATWANI PARTH BHARAT < >> >> > >> > > h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > >> > > >> >> > >> > > > Dmitriy Sir, >> >> > >> > > > I have Created a github branch Github Branch Having Initial >> >> Kmeans >> >> > >> Code >> >> > >> > > > <https://github.com/parth2691/ >> Spark_Mahout/tree/Dmitriy-Lyub >> >> imov> >> >> > >> > > > >> >> > >> > > > >> >> > >> > > > Thanks & Regards >> >> > >> > > > Parth Khatwani >> >> > >> > > > >> >> > >> > > > On Thu, Apr 13, 2017 at 3:19 AM, Andrew Palumbo < >> >> > ap....@outlook.com >> >> > >> > >> >> > >> > > > wrote: >> >> > >> > > > >> >> > >> > > >> +1 to creating a branch. >> >> > >> > > >> >> >> > >> > > >> >> >> > >> > > >> >> >> > >> > > >> Sent from my Verizon Wireless 4G LTE smartphone >> >> > >> > > >> >> >> > >> > > >> >> >> > >> > > >> -------- Original message -------- >> >> > >> > > >> From: Dmitriy Lyubimov <dlie...@gmail.com> >> >> > >> > > >> Date: 04/12/2017 11:25 (GMT-08:00) >> >> > >> > > >> To: dev@mahout.apache.org >> >> > >> > > >> Subject: Re: Trying to write the KMeans Clustering Using >> >> "Apache >> >> > >> > Mahout >> >> > >> > > >> Samsara" >> >> > >> > > >> >> >> > >> > > >> can't say i can read this code well formatted that way... >> >> > >> > > >> >> >> > >> > > >> it would seem to me that the code is not using the >> broadcast >> >> > >> variable >> >> > >> > > and >> >> > >> > > >> instead is using closure variable. that's the only thing i >> can >> >> > >> > > immediately >> >> > >> > > >> see by looking in the middle of it. >> >> > >> > > >> >> >> > >> > > >> it would be better if you created a branch on github for >> that >> >> > code >> >> > >> > that >> >> > >> > > >> would allow for easy check-outs and comments. >> >> > >> > > >> >> >> > >> > > >> -d >> >> > >> > > >> >> >> > >> > > >> On Wed, Apr 12, 2017 at 10:29 AM, KHATWANI PARTH BHARAT < >> >> > >> > > >> h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > >> > > >> >> >> > >> > > >> > @Dmitriy Sir >> >> > >> > > >> > >> >> > >> > > >> > I have completed the Kmeans code as per the algorithm you >> >> have >> >> > >> > Outline >> >> > >> > > >> > above >> >> > >> > > >> > >> >> > >> > > >> > My code is as follows >> >> > >> > > >> > >> >> > >> > > >> > This code works fine till step number 10 >> >> > >> > > >> > >> >> > >> > > >> > In step 11 i am assigning the new centriod index to >> >> > >> corresponding >> >> > >> > row >> >> > >> > > >> key >> >> > >> > > >> > of data Point in the matrix >> >> > >> > > >> > I think i am doing something wrong in step 11 may be i am >> >> using >> >> > >> > > >> incorrect >> >> > >> > > >> > syntax >> >> > >> > > >> > >> >> > >> > > >> > Can you help me find out what am i doing wrong. >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > //start of main method >> >> > >> > > >> > >> >> > >> > > >> > def main(args: Array[String]) { >> >> > >> > > >> > //1. initialize the spark and mahout context >> >> > >> > > >> > val conf = new SparkConf() >> >> > >> > > >> > .setAppName("DRMExample") >> >> > >> > > >> > .setMaster(args(0)) >> >> > >> > > >> > .set("spark.serializer", >> "org.apache.spark.serializer. >> >> > >> > > >> > KryoSerializer") >> >> > >> > > >> > .set("spark.kryo.registrator", >> >> > >> > > >> > "org.apache.mahout.sparkbindings.io >> .MahoutKryoRegistrator") >> >> > >> > > >> > implicit val sc = new SparkDistributedContext(new >> >> > >> > > >> SparkContext(conf)) >> >> > >> > > >> > >> >> > >> > > >> > //2. read the data file and save it in the rdd >> >> > >> > > >> > val lines = sc.textFile(args(1)) >> >> > >> > > >> > >> >> > >> > > >> > //3. convert data read in as string in to array of >> >> double >> >> > >> > > >> > val test = lines.map(line => >> >> line.split('\t').map(_.toDoubl >> >> > >> e)) >> >> > >> > > >> > >> >> > >> > > >> > //4. add a column having value 1 in array of double >> this >> >> > will >> >> > >> > > >> > create something like (1 | D)', which will be used while >> >> > >> > calculating >> >> > >> > > >> > (1 | D)' >> >> > >> > > >> > val augumentedArray = test.map(addCentriodColumn _) >> >> > >> > > >> > >> >> > >> > > >> > //5. convert rdd of array of double in rdd of >> >> DenseVector >> >> > >> > > >> > val rdd = augumentedArray.map(dvec(_)) >> >> > >> > > >> > >> >> > >> > > >> > //6. convert rdd to DrmRdd >> >> > >> > > >> > val rddMatrixLike: DrmRdd[Int] = >> rdd.zipWithIndex.map { >> >> > case >> >> > >> (v, >> >> > >> > > >> > idx) => (idx.toInt, v) } //7. convert DrmRdd to >> >> > >> > > >> > CheckpointedDrm[Int] val matrix = >> drmWrap(rddMatrixLike) >> >> > >> //8. >> >> > >> > > >> > seperating the column having all ones created in step 4 >> and >> >> > will >> >> > >> use >> >> > >> > > >> > it later val oneVector = matrix(::, 0 until 1) >> >> //9. >> >> > >> final >> >> > >> > > >> > input data in DrmLike[Int] format val dataDrmX = >> >> matrix(::, >> >> > 1 >> >> > >> > until >> >> > >> > > >> > 4) //9. Sampling to select initial centriods >> >> val >> >> > >> > > >> > centriods = drmSampleKRows(dataDrmX, 2, false) >> >> > centriods.size >> >> > >> > > >> > //10. Broad Casting the initial centriods val >> >> > broadCastMatrix >> >> > >> = >> >> > >> > > >> > drmBroadcast(centriods) //11. Iterating over >> the >> >> > Data >> >> > >> > > >> > Matrix(in DrmLike[Int] format) to calculate the initial >> >> > centriods >> >> > >> > > >> > dataDrmX.mapBlock() { case (keys, block) => >> for >> >> > (row >> >> > >> <- >> >> > >> > 0 >> >> > >> > > >> > until block.nrow) { var dataPoint = block(row, >> ::) >> >> > >> > > >> > //12. findTheClosestCentriod find the closest >> >> centriod >> >> > to >> >> > >> > the >> >> > >> > > >> > Data point specified by "dataPoint" val >> >> closesetIndex >> >> > = >> >> > >> > > >> > findTheClosestCentriod(dataPoint, centriods) >> >> > >> > //13. >> >> > >> > > >> > assigning closest index to key keys(row) = >> >> > closesetIndex >> >> > >> > > >> > } keys -> block } >> >> > >> > > >> > >> >> > >> > > >> > //14. Calculating the (1|D) val b = (oneVector >> >> cbind >> >> > >> > > >> > dataDrmX) //15. Aggregating Transpose (1|D)' >> val >> >> > >> > bTranspose >> >> > >> > > >> > = (oneVector cbind dataDrmX).t // after step 15 >> >> bTranspose >> >> > >> will >> >> > >> > > >> > have data in the following format /*(n+1)*K where >> >> > >> n=dimension >> >> > >> > > >> > of the data point, K=number of clusters * zeroth row >> will >> >> > >> contain >> >> > >> > > >> > the count of points assigned to each cluster * >> assuming >> >> 3d >> >> > >> data >> >> > >> > > >> > points * */ >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > val nrows = b.nrow.toInt //16. slicing the count >> >> vectors >> >> > >> out >> >> > >> > > >> > val pointCountVectors = drmBroadcast(b(0 until 1, >> >> > ::).collect(0, >> >> > >> > ::)) >> >> > >> > > >> > val vectorSums = b(1 until nrows, ::) //17. >> dividing >> >> the >> >> > >> data >> >> > >> > > >> > point by count vector vectorSums.mapBlock() { >> case >> >> > (keys, >> >> > >> > > >> > block) => for (row <- 0 until block.nrow) { >> >> > >> > block(row, >> >> > >> > > >> > ::) /= pointCountVectors } keys -> block >> } >> >> > >> //18. >> >> > >> > > >> > seperating the count vectors val newCentriods = >> >> > >> vectorSums.t(::,1 >> >> > >> > > >> > until centriods.size) //19. iterate over the >> >> above >> >> > >> code >> >> > >> > > >> > till convergence criteria is meet }//end of main method >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > // method to find the closest centriod to data point( >> vec: >> >> > >> Vector >> >> > >> > > >> > in the arguments) def findTheClosestCentriod(vec: >> Vector, >> >> > >> matrix: >> >> > >> > > >> > Matrix): Int = { >> >> > >> > > >> > var index = 0 >> >> > >> > > >> > var closest = Double.PositiveInfinity >> >> > >> > > >> > for (row <- 0 until matrix.nrow) { >> >> > >> > > >> > val squaredSum = ssr(vec, matrix(row, ::)) >> >> > >> > > >> > val tempDist = Math.sqrt(ssr(vec, matrix(row, ::))) >> >> > >> > > >> > if (tempDist < closest) { >> >> > >> > > >> > closest = tempDist >> >> > >> > > >> > index = row >> >> > >> > > >> > } >> >> > >> > > >> > } >> >> > >> > > >> > index >> >> > >> > > >> > } >> >> > >> > > >> > >> >> > >> > > >> > //calculating the sum of squared distance between the >> >> > >> > > points(Vectors) >> >> > >> > > >> > def ssr(a: Vector, b: Vector): Double = { >> >> > >> > > >> > (a - b) ^= 2 sum >> >> > >> > > >> > } >> >> > >> > > >> > >> >> > >> > > >> > //method used to create (1|D) >> >> > >> > > >> > def addCentriodColumn(arg: Array[Double]): >> Array[Double] >> >> = { >> >> > >> > > >> > val newArr = new Array[Double](arg.length + 1) >> >> > >> > > >> > newArr(0) = 1.0; >> >> > >> > > >> > for (i <- 0 until (arg.size)) { >> >> > >> > > >> > newArr(i + 1) = arg(i); >> >> > >> > > >> > } >> >> > >> > > >> > newArr >> >> > >> > > >> > } >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > Thanks & Regards >> >> > >> > > >> > Parth Khatwani >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > >> >> > >> > > >> > On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT < >> >> > >> > > >> > h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > >> > > >> > >> >> > >> > > >> > > >> >> > >> > > >> > > ---------- Forwarded message ---------- >> >> > >> > > >> > > From: Dmitriy Lyubimov <dlie...@gmail.com> >> >> > >> > > >> > > Date: Fri, Mar 31, 2017 at 11:34 PM >> >> > >> > > >> > > Subject: Re: Trying to write the KMeans Clustering >> Using >> >> > >> "Apache >> >> > >> > > >> Mahout >> >> > >> > > >> > > Samsara" >> >> > >> > > >> > > To: "dev@mahout.apache.org" <dev@mahout.apache.org> >> >> > >> > > >> > > >> >> > >> > > >> > > >> >> > >> > > >> > > ps1 this assumes row-wise construction of A based on >> >> training >> >> > >> set >> >> > >> > > of m >> >> > >> > > >> > > n-dimensional points. >> >> > >> > > >> > > ps2 since we are doing multiple passes over A it may >> make >> >> > >> sense to >> >> > >> > > >> make >> >> > >> > > >> > > sure it is committed to spark cache (by using >> checkpoint >> >> > api), >> >> > >> if >> >> > >> > > >> spark >> >> > >> > > >> > is >> >> > >> > > >> > > used >> >> > >> > > >> > > >> >> > >> > > >> > > On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy Lyubimov < >> >> > >> > > dlie...@gmail.com >> >> > >> > > >> > >> >> > >> > > >> > > wrote: >> >> > >> > > >> > > >> >> > >> > > >> > > > here is the outline. For details of APIs, please >> refer >> >> to >> >> > >> > samsara >> >> > >> > > >> > manual >> >> > >> > > >> > > > [2], i will not be be repeating it. >> >> > >> > > >> > > > >> >> > >> > > >> > > > Assume your training data input is m x n matrix A. >> For >> >> > >> > simplicity >> >> > >> > > >> let's >> >> > >> > > >> > > > assume it's a DRM with int row keys, i.e., >> DrmLike[Int]. >> >> > >> > > >> > > > >> >> > >> > > >> > > > Initialization: >> >> > >> > > >> > > > >> >> > >> > > >> > > > First, classic k-means starts by selecting initial >> >> > clusters, >> >> > >> by >> >> > >> > > >> > sampling >> >> > >> > > >> > > > them out. You can do that by using sampling api [1], >> >> thus >> >> > >> > forming >> >> > >> > > a >> >> > >> > > >> k >> >> > >> > > >> > x n >> >> > >> > > >> > > > in-memory matrix C (current centroids). C is >> therefore >> >> of >> >> > >> > Mahout's >> >> > >> > > >> > Matrix >> >> > >> > > >> > > > type. >> >> > >> > > >> > > > >> >> > >> > > >> > > > You the proceed by alternating between cluster >> >> assignments >> >> > >> and >> >> > >> > > >> > > > recompupting centroid matrix C till convergence >> based on >> >> > some >> >> > >> > test >> >> > >> > > >> or >> >> > >> > > >> > > > simply limited by epoch count budget, your choice. >> >> > >> > > >> > > > >> >> > >> > > >> > > > Cluster assignments: here, we go over current >> generation >> >> > of A >> >> > >> > and >> >> > >> > > >> > > > recompute centroid indexes for each row in A. Once we >> >> > >> recompute >> >> > >> > > >> index, >> >> > >> > > >> > we >> >> > >> > > >> > > > put it into the row key . You can do that by >> assigning >> >> > >> centroid >> >> > >> > > >> indices >> >> > >> > > >> > > to >> >> > >> > > >> > > > keys of A using operator mapblock() (details in [2], >> >> [3], >> >> > >> [4]). >> >> > >> > > You >> >> > >> > > >> > also >> >> > >> > > >> > > > need to broadcast C in order to be able to access it >> in >> >> > >> > efficient >> >> > >> > > >> > manner >> >> > >> > > >> > > > inside mapblock() closure. Examples of that are >> plenty >> >> > given >> >> > >> in >> >> > >> > > [2]. >> >> > >> > > >> > > > Essentially, in mapblock, you'd reform the row keys >> to >> >> > >> reflect >> >> > >> > > >> cluster >> >> > >> > > >> > > > index in C. while going over A, you'd have a "nearest >> >> > >> neighbor" >> >> > >> > > >> problem >> >> > >> > > >> > > to >> >> > >> > > >> > > > solve for the row of A and centroids C. This is the >> >> bulk of >> >> > >> > > >> computation >> >> > >> > > >> > > > really, and there are a few tricks there that can >> speed >> >> > this >> >> > >> > step >> >> > >> > > >> up in >> >> > >> > > >> > > > both exact and approximate manner, but you can start >> >> with a >> >> > >> > naive >> >> > >> > > >> > search. >> >> > >> > > >> > > > >> >> > >> > > >> > > > Centroid recomputation: >> >> > >> > > >> > > > once you assigned centroids to the keys of marix A, >> >> you'd >> >> > >> want >> >> > >> > to >> >> > >> > > >> do an >> >> > >> > > >> > > > aggregating transpose of A to compute essentially >> >> average >> >> > of >> >> > >> > row A >> >> > >> > > >> > > grouped >> >> > >> > > >> > > > by the centroid key. The trick is to do a >> computation of >> >> > >> (1|A)' >> >> > >> > > >> which >> >> > >> > > >> > > will >> >> > >> > > >> > > > results in a matrix of the shape (Counts/sums of >> cluster >> >> > >> rows). >> >> > >> > > >> This is >> >> > >> > > >> > > the >> >> > >> > > >> > > > part i find difficult to explain without a latex >> >> graphics. >> >> > >> > > >> > > > >> >> > >> > > >> > > > In Samsara, construction of (1|A)' corresponds to DRM >> >> > >> expression >> >> > >> > > >> > > > >> >> > >> > > >> > > > (1 cbind A).t (again, see [2]). >> >> > >> > > >> > > > >> >> > >> > > >> > > > So when you compute, say, >> >> > >> > > >> > > > >> >> > >> > > >> > > > B = (1 | A)', >> >> > >> > > >> > > > >> >> > >> > > >> > > > then B is (n+1) x k, so each column contains a vector >> >> > >> > > corresponding >> >> > >> > > >> to >> >> > >> > > >> > a >> >> > >> > > >> > > > cluster 1..k. In such column, the first element would >> >> be # >> >> > of >> >> > >> > > >> points in >> >> > >> > > >> > > the >> >> > >> > > >> > > > cluster, and the rest of it would correspond to sum >> of >> >> all >> >> > >> > points. >> >> > >> > > >> So >> >> > >> > > >> > in >> >> > >> > > >> > > > order to arrive to an updated matrix C, we need to >> >> collect >> >> > B >> >> > >> > into >> >> > >> > > >> > memory, >> >> > >> > > >> > > > and slice out counters (first row) from the rest of >> it. >> >> > >> > > >> > > > >> >> > >> > > >> > > > So, to compute C: >> >> > >> > > >> > > > >> >> > >> > > >> > > > C <- B (2:,:) each row divided by B(1,:) >> >> > >> > > >> > > > >> >> > >> > > >> > > > (watch out for empty clusters with 0 elements, this >> will >> >> > >> cause >> >> > >> > > lack >> >> > >> > > >> of >> >> > >> > > >> > > > convergence and NaNs in the newly computed C). >> >> > >> > > >> > > > >> >> > >> > > >> > > > This operation obviously uses subblocking and >> row-wise >> >> > >> iteration >> >> > >> > > >> over >> >> > >> > > >> > B, >> >> > >> > > >> > > > for which i am again making reference to [2]. >> >> > >> > > >> > > > >> >> > >> > > >> > > > >> >> > >> > > >> > > > [1] https://github.com/apache/ >> >> > mahout/blob/master/math-scala/ >> >> > >> > > >> > > > src/main/scala/org/apache/maho >> >> ut/math/drm/package.scala# >> >> > L149 >> >> > >> > > >> > > > >> >> > >> > > >> > > > [2], Sasmara manual, a bit dated but viable, >> >> > >> > http://apache.github >> >> > >> > > . >> >> > >> > > >> > > > io/mahout/doc/ScalaSparkBindings.html >> >> > >> > > >> > > > >> >> > >> > > >> > > > [3] scaladoc, again, dated but largely viable for the >> >> > >> purpose of >> >> > >> > > >> this >> >> > >> > > >> > > > exercise: >> >> > >> > > >> > > > http://apache.github.io/mahout >> /0.10.1/docs/mahout-math- >> >> > >> > > >> scala/index.htm >> >> > >> > > >> > > > >> >> > >> > > >> > > > [4] mapblock etc. http://apache.github.io/mahout >> >> > >> > > >> /0.10.1/docs/mahout- >> >> > >> > > >> > > > math-scala/index.html#org.apache.mahout.math.drm. >> >> > RLikeDrmOps >> >> > >> > > >> > > > >> >> > >> > > >> > > > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH >> BHARAT < >> >> > >> > > >> > > > h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > >> > > >> > > > >> >> > >> > > >> > > >> @Dmitriycan you please again tell me the approach to >> >> move >> >> > >> > ahead. >> >> > >> > > >> > > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > > >> Thanks >> >> > >> > > >> > > >> Parth Khatwani >> >> > >> > > >> > > >> >> >> > >> > > >> > > >> >> >> > >> > > >> > > >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH >> >> BHARAT < >> >> > >> > > >> > > >> h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > >> > > >> > > >> >> >> > >> > > >> > > >> > yes i am unable to figure out the way ahead. >> >> > >> > > >> > > >> > Like how to create the augmented matrix A := (0|D) >> >> which >> >> > >> you >> >> > >> > > have >> >> > >> > > >> > > >> > mentioned. >> >> > >> > > >> > > >> > >> >> > >> > > >> > > >> > >> >> > >> > > >> > > >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy >> Lyubimov < >> >> > >> > > >> > dlie...@gmail.com >> >> > >> > > >> > > > >> >> > >> > > >> > > >> > wrote: >> >> > >> > > >> > > >> > >> >> > >> > > >> > > >> >> was my reply for your post on @user has been a >> bit >> >> > >> > confusing? >> >> > >> > > >> > > >> >> >> >> > >> > > >> > > >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH >> >> BHARAT >> >> > < >> >> > >> > > >> > > >> >> h2016...@pilani.bits-pilani.ac.in> wrote: >> >> > >> > > >> > > >> >> >> >> > >> > > >> > > >> >> > Sir, >> >> > >> > > >> > > >> >> > I am trying to write the kmeans clustering >> >> algorithm >> >> > >> using >> >> > >> > > >> Mahout >> >> > >> > > >> > > >> >> Samsara >> >> > >> > > >> > > >> >> > but i am bit confused >> >> > >> > > >> > > >> >> > about how to leverage Distributed Row Matrix >> for >> >> the >> >> > >> same. >> >> > >> > > Can >> >> > >> > > >> > > >> anybody >> >> > >> > > >> > > >> >> help >> >> > >> > > >> > > >> >> > me with same. >> >> > >> > > >> > > >> >> > >> >> > >> > > >> > > >> >> > >> >> > >> > > >> > > >> >> > >> >> > >> > > >> > > >> >> > >> >> > >> > > >> > > >> >> > >> >> > >> > > >> > > >> >> > Thanks >> >> > >> > > >> > > >> >> > Parth Khatwani >> >> > >> > > >> > > >> >> > >> >> > >> > > >> > > >> >> >> >> > >> > > >> > > >> > >> >> > >> > > >> > > >> > >> >> > >> > > >> > > >> >> >> > >> > > >> > > > >> >> > >> > > >> > > > >> >> > >> > > >> > > >> >> > >> > > >> > > >> >> > >> > > >> > >> >> > >> > > >> >> >> > >> > > > >> >> > >> > > > >> >> > >> > > >> >> > >> > >> >> > >> >> >> > > >> >> > > >> >> > >> >> >> > >> > >> > >