Dong Wang created SPARK-29817:

             Summary: Missing persist on docs in 
                 Key: SPARK-29817
             Project: Spark
          Issue Type: Improvement
          Components: MLlib
    Affects Versions: 2.4.3
            Reporter: Dong Wang

The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: 
verticesTMP.reduceByKey, and docs.take(1). It should be persisted.
  override private[clustering] def initialize(
      docs: RDD[(Long, Vector)],
      lda: LDA): EMLDAOptimizer = {
    val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, 
termCounts: Vector) =>
      // Add edges for terms with non-zero counts.
      termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, 
cnt) =>
        Edge(docID, term2index(term), cnt)
    // Create vertices.
    // Initially, we use random soft assignments of tokens to topics (random 
    val docTermVertices: RDD[(VertexId, TopicCounts)] = {
      val verticesTMP: RDD[(VertexId, TopicCounts)] =
        edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
          val random = new Random(partIndex + randomSeed)
          partEdges.flatMap { edge =>
            val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
            val sum = gamma * edge.attr
            Seq((edge.srcId, sum), (edge.dstId, sum))
      verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - 
docs. First use docs
    // Partition such that edges are grouped by document
    this.graph = Graph(docTermVertices, 
    this.k = k
    this.vocabSize = docs.take(1).head._2.size // Second use docs

This issue is reported by our tool CacheCheck, which is used to dynamically 
detecting persist()/unpersist() api misuses.

This message was sent by Atlassian Jira

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to