[ 
https://issues.apache.org/jira/browse/SPARK-22075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172859#comment-16172859
 ] 

zhengruifeng edited comment on SPARK-22075 at 9/20/17 9:48 AM:
---------------------------------------------------------------

-Same issue seems exist in {{Pregel}}, each call of {{connectedComponents}} 
will generate two new cached rdds

{code}
scala> import org.apache.spark.graphx.GraphLoader
import org.apache.spark.graphx.GraphLoader

scala> val graph = GraphLoader.edgeListFile(sc, 
"data/graphx/followers.txt").persist()
graph: org.apache.spark.graphx.Graph[Int,Int] = 
org.apache.spark.graphx.impl.GraphImpl@3c20abd6

scala> sc.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> 
VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at 
VertexRDD.scala:345, 2 -> GraphLoader.edgeListFile - edges 
(data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at 
mapPartitionsWithIndex at GraphLoader.scala:75)

scala> val cc = graph.connectedComponents()
17/09/20 15:33:39 WARN BlockManager: Block rdd_11_0 already exists on this 
machine; not re-adding it
cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = 
org.apache.spark.graphx.impl.GraphImpl@2cc3b0a7

scala> sc.getPersistentRDDs
res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> 
VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at 
VertexRDD.scala:345, 2 -> GraphLoader.edgeListFile - edges 
(data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at 
mapPartitionsWithIndex at GraphLoader.scala:75, 38 -> EdgeRDD 
ZippedPartitionsRDD2[38] at zipPartitions at ReplicatedVertexView.scala:112, 32 
-> VertexRDD ZippedPartitionsRDD2[32] at zipPartitions at 
VertexRDDImpl.scala:156)

scala> val cc = graph.connectedComponents()
cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = 
org.apache.spark.graphx.impl.GraphImpl@69abff1d

scala> sc.getPersistentRDDs
res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(38 -> EdgeRDD 
ZippedPartitionsRDD2[38] at zipPartitions at ReplicatedVertexView.scala:112, 70 
-> VertexRDD ZippedPartitionsRDD2[70] at zipPartitions at 
VertexRDDImpl.scala:156, 2 -> GraphLoader.edgeListFile - edges 
(data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at 
mapPartitionsWithIndex at GraphLoader.scala:75, 32 -> VertexRDD 
ZippedPartitionsRDD2[32] at zipPartitions at VertexRDDImpl.scala:156, 76 -> 
EdgeRDD ZippedPartitionsRDD2[76] at zipPartitions at 
ReplicatedVertexView.scala:112, 8 -> VertexRDD, VertexRDD MapPartitionsRDD[8] 
at mapPartitions at VertexRDD.scala:345)

{code}-

Pregel is OK, the intermediate rdds are unpersisted out of checkpointer


was (Author: podongfeng):
Same issue seems exist in {{Pregel}}, each call of {{connectedComponents}} will 
generate two new cached rdds

{code}
scala> import org.apache.spark.graphx.GraphLoader
import org.apache.spark.graphx.GraphLoader

scala> val graph = GraphLoader.edgeListFile(sc, 
"data/graphx/followers.txt").persist()
graph: org.apache.spark.graphx.Graph[Int,Int] = 
org.apache.spark.graphx.impl.GraphImpl@3c20abd6

scala> sc.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> 
VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at 
VertexRDD.scala:345, 2 -> GraphLoader.edgeListFile - edges 
(data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at 
mapPartitionsWithIndex at GraphLoader.scala:75)

scala> val cc = graph.connectedComponents()
17/09/20 15:33:39 WARN BlockManager: Block rdd_11_0 already exists on this 
machine; not re-adding it
cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = 
org.apache.spark.graphx.impl.GraphImpl@2cc3b0a7

scala> sc.getPersistentRDDs
res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> 
VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at 
VertexRDD.scala:345, 2 -> GraphLoader.edgeListFile - edges 
(data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at 
mapPartitionsWithIndex at GraphLoader.scala:75, 38 -> EdgeRDD 
ZippedPartitionsRDD2[38] at zipPartitions at ReplicatedVertexView.scala:112, 32 
-> VertexRDD ZippedPartitionsRDD2[32] at zipPartitions at 
VertexRDDImpl.scala:156)

scala> val cc = graph.connectedComponents()
cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = 
org.apache.spark.graphx.impl.GraphImpl@69abff1d

scala> sc.getPersistentRDDs
res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(38 -> EdgeRDD 
ZippedPartitionsRDD2[38] at zipPartitions at ReplicatedVertexView.scala:112, 70 
-> VertexRDD ZippedPartitionsRDD2[70] at zipPartitions at 
VertexRDDImpl.scala:156, 2 -> GraphLoader.edgeListFile - edges 
(data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at 
mapPartitionsWithIndex at GraphLoader.scala:75, 32 -> VertexRDD 
ZippedPartitionsRDD2[32] at zipPartitions at VertexRDDImpl.scala:156, 76 -> 
EdgeRDD ZippedPartitionsRDD2[76] at zipPartitions at 
ReplicatedVertexView.scala:112, 8 -> VertexRDD, VertexRDD MapPartitionsRDD[8] 
at mapPartitions at VertexRDD.scala:345)

{code}

> GBTs forgot to unpersist datasets cached by Checkpointer
> --------------------------------------------------------
>
>                 Key: SPARK-22075
>                 URL: https://issues.apache.org/jira/browse/SPARK-22075
>             Project: Spark
>          Issue Type: Bug
>          Components: ML
>    Affects Versions: 2.3.0
>            Reporter: zhengruifeng
>
> {{PeriodicRDDCheckpointer}} will automatically persist the last 3 datasets 
> called by {{PeriodicRDDCheckpointer.update}}.
> In GBTs, the last 3 intermediate rdds are still cached after {{fit()}}
> {code}
> scala> val dataset = 
> spark.read.format("libsvm").load("./data/mllib/sample_kmeans_data.txt")
> dataset: org.apache.spark.sql.DataFrame = [label: double, features: vector]   
>   
> scala> dataset.persist()
> res0: dataset.type = [label: double, features: vector]
> scala> dataset.count
> res1: Long = 6
> scala> sc.getPersistentRDDs
> res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
> Map(8 -> *FileScan libsvm [label#0,features#1] Batched: false, Format: 
> LibSVM, Location: 
> InMemoryFileIndex[file:/Users/zrf/.dev/spark-2.2.0-bin-hadoop2.7/data/mllib/sample_kmeans_data.txt],
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<label:double,features:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>
>  MapPartitionsRDD[8] at persist at <console>:26)
> scala> import org.apache.spark.ml.regression._
> import org.apache.spark.ml.regression._
> scala> val model = gbt.fit(dataset)
> <console>:28: error: not found: value gbt
>        val model = gbt.fit(dataset)
>                    ^
> scala> val gbt = new GBTRegressor()
> gbt: org.apache.spark.ml.regression.GBTRegressor = gbtr_da1fe371a25e
> scala> val model = gbt.fit(dataset)
> 17/09/20 14:05:33 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:38 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> 17/09/20 14:05:38 WARN DecisionTreeMetadata: DecisionTree reducing maxBins 
> from 32 to 6 (= number of training instances)
> model: org.apache.spark.ml.regression.GBTRegressionModel = GBTRegressionModel 
> (uid=gbtr_da1fe371a25e) with 20 trees
> scala> sc.getPersistentRDDs
> res3: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
> Map(322 -> MapPartitionsRDD[322] at mapPartitions at 
> GradientBoostedTrees.scala:134, 307 -> MapPartitionsRDD[307] at mapPartitions 
> at GradientBoostedTrees.scala:134, 8 -> *FileScan libsvm [label#0,features#1] 
> Batched: false, Format: LibSVM, Location: 
> InMemoryFileIndex[file:/Users/zrf/.dev/spark-2.2.0-bin-hadoop2.7/data/mllib/sample_kmeans_data.txt],
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<label:double,features:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>
>  MapPartitionsRDD[8] at persist at <console>:26, 292 -> MapPartitionsRDD[292] 
> at mapPartitions at GradientBoostedTrees.scala:134)
> scala> sc.getPersistentRDDs.size
> res4: Int = 4
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to