I suspect mllib code would suffer from non-determinsitic parallelism behavior in its Lloyd iteration (as well as as memory overflow risk) in certain corner case situations such as there are a lot of datapoints but very few clusters sought. Spark (for the right reasons) doesn't believe in sort-n-spill stuff which means centroid recomputation may suffer from decreased or assymmetric parallelism after groupByKey call, especially if cluster attribution counts end up heavily skewed for whatever reason.
E.g. if you have 1 bln points and two centroids, it's my understanding that post-attribution centroid reduction will create only two tasks each processing no less than 500 mln attributed points, even if cluster capacity quite adequately offers 500 tasks for this job. I have a sketch of Lloyd iteration in Spark Bindings that computes centroid matrix in backend in a way that is free of the aforementioned limitation. The trick is that transposition is in fact an aggregate operation as well in case vector keys contain duplicates. So all it needs is to re-key points according to their attribution, and then perform transpose. Point counts are kept in the first position of attributed vectors, so they would aggregate correctly as well as vector sums. [1] Despite that improvement, my implementation still suffers from falure to scale for k (i.e. mapBlock closure running time will be O(k) instead of ideally wanted O(1)). Regarding dpmeans: unlike ||, the default code for dp-means does not assume parallelization. Is there an adaptation to a shared-nothing computation of that? [1] /** * * @param drmData * @param inCoreC -- matrix with columns representing current cluster centers * @return updated centroid matrix */ private def lLoydIteration[K: ClassTag](drmData: DrmLike[K], inCoreC: Matrix, dfunc: DistFunc = eucl): DrmLike[Int] = { // TODO: this is still tightly coupled to Spark implicit val sc = drmData.rdd.sparkContext val n = drmData.ncol val numCentroids = inCoreC.ncol val bcastC = drmBroadcast(inCoreC) val drmCentroids = drmData // Output assigned centroidi as a key for each row point. .mapBlock(ncol = n + 1) { case (keys, block) => val assignedCentroids = for (r <- 0 until block.nrow) yield { val row = block(r, ::) val inCoreC = bcastC: Matrix // Find closest centroid -- TODO: this needs triangle inequality in case of euclidean // distance (0 until numCentroids).view.map(ci => dfunc(row, inCoreC(::, ci))).minValueIndex() } // Grow block with vector of 1.0 on the left -- this will serve as denominator of new // centroid data to accumulate counts val newBlock = block.like(block.nrow, n + 1) newBlock(::, 0) := 1.0 newBlock(::, 1 to n) := block assignedCentroids.toArray -> block } // Transposition aggregates vectors by centroid keys .t(::, 0 until numCentroids) .checkpoint() // collect centroid sizes into a vector and re-broadcast val vCSizes = drmBroadcast(drmCentroids(0 to 0, ::).collect(0, ::)) // Finalize centroid matrix. First, slice off the counter row drmCentroids(1 to n, ::) // Then divide each centroid sum by its count, thus getting center. .mapBlock() { case (keys, block) => for (col <- 0 until numCentroids) block(::, col) /= vCSizes.value(col) keys -> block } } On Wed, Apr 2, 2014 at 12:03 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: > Considering porting implementation [1] and paper for KMeans || for > Bindings. > > This seems like another method to map fairly nicely. > > The problem I am contemplating is ||-initialization, and in particular, > centroid storage. That particular implementation assumes centroids could be > kept in memory in front. > > (1) Question is, is it a dangerous idea. It doesn't seem like it > particularly is, since unlikely people would want more k>1e+6. Another > thing, centers seem to be passed in via closure attribute (i.e. > java-serialized array-backed matrix).However, with Bindings it is quite > possible to keep centers at the back as a matrix. > > (2) obviously, LLoyd iterations are not terribly accurate. || and ++ > versions mostly speed things up. Is there any better-than-LLoyd accuracy > preference? > > > [1] > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala > > >