I suspect mllib code would suffer from non-determinsitic parallelism
behavior in its Lloyd iteration (as well as as memory overflow risk) in
certain corner case situations such as there are a lot of datapoints but
very few clusters sought. Spark (for the right reasons) doesn't believe in
sort-n-spill stuff which means centroid recomputation may suffer from
decreased or assymmetric parallelism after groupByKey call, especially if
cluster attribution counts end up heavily skewed for whatever reason.

E.g. if you have 1 bln points and two centroids, it's my understanding that
post-attribution centroid reduction will create only two tasks each
processing no less than 500 mln attributed points, even if cluster capacity
quite adequately offers 500 tasks for this job.


I have a sketch of Lloyd iteration in Spark Bindings that computes centroid
matrix in backend in a way that is free of the aforementioned limitation.
The trick is that transposition is in fact an aggregate operation as well
in case vector keys contain duplicates. So all it needs is to re-key points
according to their attribution, and then perform transpose. Point counts
are kept in the first position of attributed vectors, so they would
aggregate correctly as well as vector sums. [1]

Despite that improvement, my implementation still suffers from falure to
scale for k (i.e. mapBlock closure running time will be O(k) instead of
ideally wanted O(1)).


Regarding dpmeans: unlike ||,  the default code for dp-means does not
assume parallelization. Is there an adaptation to a shared-nothing
computation of that?

[1]
/**
   *
   * @param drmData
   * @param inCoreC -- matrix with columns representing current cluster
centers
   * @return updated centroid matrix
   */
  private def lLoydIteration[K: ClassTag](drmData: DrmLike[K], inCoreC:
Matrix, dfunc: DistFunc = eucl):
    DrmLike[Int] = {

    // TODO: this is still tightly coupled to Spark
    implicit val sc = drmData.rdd.sparkContext

    val n = drmData.ncol
    val numCentroids = inCoreC.ncol
    val bcastC = drmBroadcast(inCoreC)

    val drmCentroids = drmData

        // Output assigned centroidi as a key for each row point.
        .mapBlock(ncol = n + 1) {

      case (keys, block) =>

        val assignedCentroids = for (r <- 0 until block.nrow) yield {
          val row = block(r, ::)
          val inCoreC = bcastC: Matrix

          // Find closest centroid -- TODO: this needs triangle inequality
in case of euclidean
          // distance
          (0 until numCentroids).view.map(ci => dfunc(row, inCoreC(::,
ci))).minValueIndex()
        }

        // Grow block with vector of 1.0 on the left -- this will serve as
denominator of new
        // centroid data to accumulate counts
        val newBlock = block.like(block.nrow, n + 1)
        newBlock(::, 0) := 1.0
        newBlock(::, 1 to n) := block

        assignedCentroids.toArray -> block
    }

        // Transposition aggregates vectors by centroid keys
        .t(::, 0 until numCentroids)

        .checkpoint()

    // collect centroid sizes into a vector and re-broadcast
    val vCSizes = drmBroadcast(drmCentroids(0 to 0, ::).collect(0, ::))

    // Finalize centroid matrix. First, slice off the counter row
    drmCentroids(1 to n, ::)

        // Then divide each centroid sum by its count, thus getting center.
        .mapBlock() {
      case (keys, block) =>
        for (col <- 0 until numCentroids) block(::, col) /=
vCSizes.value(col)
        keys -> block
    }

  }




On Wed, Apr 2, 2014 at 12:03 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:

> Considering porting implementation [1] and paper for KMeans || for
> Bindings.
>
> This seems like another method to map fairly nicely.
>
> The problem I am contemplating is ||-initialization, and in particular,
> centroid storage. That particular implementation assumes centroids could be
> kept in memory in front.
>
> (1) Question is, is it a dangerous idea. It doesn't seem like it
> particularly is, since unlikely people would want more k>1e+6. Another
> thing, centers seem to be passed in via closure attribute (i.e.
> java-serialized array-backed matrix).However, with Bindings it is quite
> possible to keep centers at the back as a matrix.
>
> (2) obviously, LLoyd iterations are not terribly accurate. || and ++
> versions mostly speed things up. Is there any better-than-LLoyd accuracy
> preference?
>
>
> [1]
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
>
>
>

Reply via email to