[ https://issues.apache.org/jira/browse/SPARK-22075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172859#comment-16172859 ]
zhengruifeng edited comment on SPARK-22075 at 9/20/17 9:48 AM: --------------------------------------------------------------- -Same issue seems exist in {{Pregel}}, each call of {{connectedComponents}} will generate two new cached rdds {code} scala> import org.apache.spark.graphx.GraphLoader import org.apache.spark.graphx.GraphLoader scala> val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt").persist() graph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@3c20abd6 scala> sc.getPersistentRDDs res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:345, 2 -> GraphLoader.edgeListFile - edges (data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at GraphLoader.scala:75) scala> val cc = graph.connectedComponents() 17/09/20 15:33:39 WARN BlockManager: Block rdd_11_0 already exists on this machine; not re-adding it cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@2cc3b0a7 scala> sc.getPersistentRDDs res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:345, 2 -> GraphLoader.edgeListFile - edges (data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at GraphLoader.scala:75, 38 -> EdgeRDD ZippedPartitionsRDD2[38] at zipPartitions at ReplicatedVertexView.scala:112, 32 -> VertexRDD ZippedPartitionsRDD2[32] at zipPartitions at VertexRDDImpl.scala:156) scala> val cc = graph.connectedComponents() cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@69abff1d scala> sc.getPersistentRDDs res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(38 -> EdgeRDD ZippedPartitionsRDD2[38] at zipPartitions at ReplicatedVertexView.scala:112, 70 -> VertexRDD ZippedPartitionsRDD2[70] at zipPartitions at VertexRDDImpl.scala:156, 2 -> GraphLoader.edgeListFile - edges (data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at GraphLoader.scala:75, 32 -> VertexRDD ZippedPartitionsRDD2[32] at zipPartitions at VertexRDDImpl.scala:156, 76 -> EdgeRDD ZippedPartitionsRDD2[76] at zipPartitions at ReplicatedVertexView.scala:112, 8 -> VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:345) {code}- Pregel is OK, the intermediate rdds are unpersisted out of checkpointer was (Author: podongfeng): Same issue seems exist in {{Pregel}}, each call of {{connectedComponents}} will generate two new cached rdds {code} scala> import org.apache.spark.graphx.GraphLoader import org.apache.spark.graphx.GraphLoader scala> val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt").persist() graph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@3c20abd6 scala> sc.getPersistentRDDs res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:345, 2 -> GraphLoader.edgeListFile - edges (data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at GraphLoader.scala:75) scala> val cc = graph.connectedComponents() 17/09/20 15:33:39 WARN BlockManager: Block rdd_11_0 already exists on this machine; not re-adding it cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@2cc3b0a7 scala> sc.getPersistentRDDs res1: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(8 -> VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:345, 2 -> GraphLoader.edgeListFile - edges (data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at GraphLoader.scala:75, 38 -> EdgeRDD ZippedPartitionsRDD2[38] at zipPartitions at ReplicatedVertexView.scala:112, 32 -> VertexRDD ZippedPartitionsRDD2[32] at zipPartitions at VertexRDDImpl.scala:156) scala> val cc = graph.connectedComponents() cc: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@69abff1d scala> sc.getPersistentRDDs res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(38 -> EdgeRDD ZippedPartitionsRDD2[38] at zipPartitions at ReplicatedVertexView.scala:112, 70 -> VertexRDD ZippedPartitionsRDD2[70] at zipPartitions at VertexRDDImpl.scala:156, 2 -> GraphLoader.edgeListFile - edges (data/graphx/followers.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at GraphLoader.scala:75, 32 -> VertexRDD ZippedPartitionsRDD2[32] at zipPartitions at VertexRDDImpl.scala:156, 76 -> EdgeRDD ZippedPartitionsRDD2[76] at zipPartitions at ReplicatedVertexView.scala:112, 8 -> VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:345) {code} > GBTs forgot to unpersist datasets cached by Checkpointer > -------------------------------------------------------- > > Key: SPARK-22075 > URL: https://issues.apache.org/jira/browse/SPARK-22075 > Project: Spark > Issue Type: Bug > Components: ML > Affects Versions: 2.3.0 > Reporter: zhengruifeng > > {{PeriodicRDDCheckpointer}} will automatically persist the last 3 datasets > called by {{PeriodicRDDCheckpointer.update}}. > In GBTs, the last 3 intermediate rdds are still cached after {{fit()}} > {code} > scala> val dataset = > spark.read.format("libsvm").load("./data/mllib/sample_kmeans_data.txt") > dataset: org.apache.spark.sql.DataFrame = [label: double, features: vector] > > scala> dataset.persist() > res0: dataset.type = [label: double, features: vector] > scala> dataset.count > res1: Long = 6 > scala> sc.getPersistentRDDs > res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = > Map(8 -> *FileScan libsvm [label#0,features#1] Batched: false, Format: > LibSVM, Location: > InMemoryFileIndex[file:/Users/zrf/.dev/spark-2.2.0-bin-hadoop2.7/data/mllib/sample_kmeans_data.txt], > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<label:double,features:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>> > MapPartitionsRDD[8] at persist at <console>:26) > scala> import org.apache.spark.ml.regression._ > import org.apache.spark.ml.regression._ > scala> val model = gbt.fit(dataset) > <console>:28: error: not found: value gbt > val model = gbt.fit(dataset) > ^ > scala> val gbt = new GBTRegressor() > gbt: org.apache.spark.ml.regression.GBTRegressor = gbtr_da1fe371a25e > scala> val model = gbt.fit(dataset) > 17/09/20 14:05:33 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:35 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:36 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:38 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > 17/09/20 14:05:38 WARN DecisionTreeMetadata: DecisionTree reducing maxBins > from 32 to 6 (= number of training instances) > model: org.apache.spark.ml.regression.GBTRegressionModel = GBTRegressionModel > (uid=gbtr_da1fe371a25e) with 20 trees > scala> sc.getPersistentRDDs > res3: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = > Map(322 -> MapPartitionsRDD[322] at mapPartitions at > GradientBoostedTrees.scala:134, 307 -> MapPartitionsRDD[307] at mapPartitions > at GradientBoostedTrees.scala:134, 8 -> *FileScan libsvm [label#0,features#1] > Batched: false, Format: LibSVM, Location: > InMemoryFileIndex[file:/Users/zrf/.dev/spark-2.2.0-bin-hadoop2.7/data/mllib/sample_kmeans_data.txt], > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<label:double,features:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>> > MapPartitionsRDD[8] at persist at <console>:26, 292 -> MapPartitionsRDD[292] > at mapPartitions at GradientBoostedTrees.scala:134) > scala> sc.getPersistentRDDs.size > res4: Int = 4 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org