The implementation of SVDPlusPlus shows that it produces two new graph in
each iteration which will also be cached to memory. However, as the
iteration goes on, more and more graph will be cached and out of memory
happens. So I think it maybe need to unpersist old graph which will not be
used any more and add a few lines of code, the details are showed as
follows:
def run(edges: RDD[Edge[Double]], conf: Conf)
    : (Graph[(DoubleMatrix, DoubleMatrix, Double, Double), Double], Double)
=
  {

    // Generate default vertex attribute
    def defaultF(rank: Int): (DoubleMatrix, DoubleMatrix, Double, Double) =
{
      val v1 = new DoubleMatrix(rank)
      val v2 = new DoubleMatrix(rank)
      for (i <- 0 until rank) {
        v1.put(i, Random.nextDouble())
        v2.put(i, Random.nextDouble())
      }
      (v1, v2, 0.0, 0.0)
    }

    // calculate global rating mean
    edges.cache()

    val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 +
b._1, a._2 + b._2))
    val u = rs / rc

    *var preG: Graph[(DoubleMatrix, DoubleMatrix, Double, Double), Double] =
null*

    // construct graph
    var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache()
   * preG = g*

    // Calculate initial bias and norm
    val t0 = g.mapReduceTriplets(
      et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))),
        (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 +
g2._2))

    g = g.outerJoinVertices(t0) {
      (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
       msg: Option[(Long, Double)]) =>
        (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 /
scala.math.sqrt(msg.get._1))
    }

    def mapTrainF(conf: Conf, u: Double)
        (et: EdgeTriplet[(DoubleMatrix, DoubleMatrix, Double, Double),
Double])
      : Iterator[(VertexId, (DoubleMatrix, DoubleMatrix, Double))] = {
      val (usr, itm) = (et.srcAttr, et.dstAttr)
      val (p, q) = (usr._1, itm._1)
      var pred = u + usr._3 + itm._3 + q.dot(usr._2)
      pred = math.max(pred, conf.minVal)
      pred = math.min(pred, conf.maxVal)
      val err = et.attr - pred
      val updateP = q.mul(err)
        .subColumnVector(p.mul(conf.gamma7))
        .mul(conf.gamma2)
      val updateQ = usr._2.mul(err)
        .subColumnVector(q.mul(conf.gamma7))
        .mul(conf.gamma2)
      val updateY = q.mul(err * usr._4)
        .subColumnVector(itm._2.mul(conf.gamma7))
        .mul(conf.gamma2)
      Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) *
conf.gamma1)),
        (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) *
conf.gamma1)))
    }

    for (i <- 0 until conf.maxIters) {

      // Phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
      g.cache()
      *preG.unpersistVertices(blocking = false)
      preG.edges.unpersist(blocking = false)
      preG = g*
      val t1 = g.mapReduceTriplets(
        et => Iterator((et.srcId, et.dstAttr._2)),
        (g1: DoubleMatrix, g2: DoubleMatrix) => g1.addColumnVector(g2))

      g = g.outerJoinVertices(t1) {
        (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
         msg: Option[DoubleMatrix]) =>
          if (msg.isDefined) (vd._1, vd._1
            .addColumnVector(msg.get.mul(vd._4)), vd._3, vd._4) else vd
      }

      // Phase 2, update p for user nodes and q, y for item nodes
      g.cache()
      *preG.unpersistVertices(blocking = false)
      preG.edges.unpersist(blocking = false)
      preG = g*
      val t2 = g.mapReduceTriplets(
        mapTrainF(conf, u),
        (g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix,
DoubleMatrix, Double)) =>
          (g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3
+ g2._3))

      g = g.outerJoinVertices(t2) {
        (vid: VertexId,
         vd: (DoubleMatrix, DoubleMatrix, Double, Double),
         msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) =>
          (vd._1.addColumnVector(msg.get._1),
vd._2.addColumnVector(msg.get._2),
            vd._3 + msg.get._3, vd._4)
      }

    }

    // calculate error on training set
    def mapTestF(conf: Conf, u: Double)
        (et: EdgeTriplet[(DoubleMatrix, DoubleMatrix, Double, Double),
Double])
      : Iterator[(VertexId, Double)] =
    {
      val (usr, itm) = (et.srcAttr, et.dstAttr)
      val (p, q) = (usr._1, itm._1)
      var pred = u + usr._3 + itm._3 + q.dot(usr._2)
      pred = math.max(pred, conf.minVal)
      pred = math.min(pred, conf.maxVal)
      val err = (et.attr - pred) * (et.attr - pred)
      Iterator((et.dstId, err))
    }
    g.cache()
    val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double)
=> g1 + g2)
    g = g.outerJoinVertices(t3) {
      (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg:
Option[Double]) =>
        if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
    }

    (g, u)
  }

Bold black lines are the code I added. I hoped that in each iteration when
new graph was cached, old graph would be unpersist. However, the fact seems
to be that both new graph and old graph are unpersist because the time used
for outerJoiinVertices and mapReduceTriples become longer and longer as the
iteration goes on. So I guess that no graph is stored and should be
recomputed in each iteration. In addition, in logs I find "WARN
impl.ShippableVertexPartitionOps: Joining two VertexPartitions with
different indexes is slow  ".
So how can I correctly unpersist old graph?
Thanks



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Graphx-some-problem-about-using-SVDPlusPlus-tp7896.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to