[ https://issues.apache.org/jira/browse/SPARK-18565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15695809#comment-15695809 ]
Dmitry Dzhus commented on SPARK-18565: -------------------------------------- The problem was that I assumed that caching and forcing an RDD guarantees that it will never be re-evaluated. inverseItemIDMap is built using a non-determenistic operation, and multiple uses of it also give different results: {code} val itemIDMap: RDD[(ContentKey, InternalContentId)] = rawEvents .map(_.content) .distinct .zipWithUniqueId() .map(u => (u._1, u._2.toInt)) .cache() logger.info(s"Built a map of ${itemIDMap.count()} item IDs") val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] = itemIDMap.map(_.swap).cache() {code} I made the operation stable by adding {{.sortBy(c => c)}} and this solved the issue. > subtractByKey modifes values in the source RDD > ---------------------------------------------- > > Key: SPARK-18565 > URL: https://issues.apache.org/jira/browse/SPARK-18565 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.0.2 > Environment: Amazon Elastic MapReduce (emr-5.2.0) > Reporter: Dmitry Dzhus > > I'm experiencing a problem with subtractByKey using Spark 2.0.2 with Scala > 2.11.x: > Relevant code: > {code} > object Types { > type ContentId = Int > type ContentKey = Tuple2[Int, ContentId] > type InternalContentId = Int > } > val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] = > itemIDMap.map(_.swap).cache() > logger.info(s"Built an inverse map of ${inverseItemIDMap.count()} item IDs") > logger.info(inverseItemIDMap.collect().mkString("I->E ", "\nI->E ", "")) > val superfluousItems: RDD[(InternalContentId, Int)] = .. .cache() > logger.info(superfluousItems.collect().mkString("SI ", "\nSI ", "")) > val filteredInverseItemIDMap: RDD[(InternalContentId, ContentKey)] = > inverseItemIDMap.subtractByKey(superfluousItems).cache() // <<===!!! > logger.info(s"${filteredInverseItemIDMap.count()} items in the filtered > inverse ID mapping") > logger.info(filteredInverseItemIDMap.collect().mkString("F I->E ", "\nF I->E > ", "")) > {code} > The operation in question is {{.subtractByKey}}. Both RDDs involved are > cached and forced via {{count()}} prior to calling {{subtractByKey}}, so I > would expect the result to be unaffected by how exactly superfluousItems is > built. > I added debugging output and filtered the resulting logs by relevant > InternalContentId values (829911, 830071). Output: > {code} > Built an inverse map of 827354 item IDs > . > . > I->E (829911,(2,1135081)) > I->E (830071,(1,2295102)) > . > . > 748190 items in the training set had less than 28 ratings > SI (829911,3) > . > . > 79164 items in the filtered inverse ID mapping > F I->E (830071,(2,1135081)) > {code} > There's no element with key 830071 in {{superfluousItems}} (SI), so it's not > removed from the source RDD. However, its value is for some reason replaced > with the one from key 829911. How could this be? I cannot reproduce it > locally - only when running on a multi-machine cluster. Is this a bug or I'm > missing something? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org