[ 
https://issues.apache.org/jira/browse/SPARK-18565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15695809#comment-15695809
 ] 

Dmitry Dzhus commented on SPARK-18565:
--------------------------------------

The problem was that I assumed that caching and forcing an RDD guarantees that 
it will never be re-evaluated. inverseItemIDMap is built using a 
non-determenistic operation, and multiple uses of it also give different 
results:

{code}
val itemIDMap: RDD[(ContentKey, InternalContentId)] =
  rawEvents
  .map(_.content)
  .distinct
  .zipWithUniqueId()
  .map(u => (u._1, u._2.toInt))
  .cache()
logger.info(s"Built a map of ${itemIDMap.count()} item IDs")
val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] =
  itemIDMap.map(_.swap).cache()
{code}

I made the operation stable by adding {{.sortBy(c => c)}} and this solved the 
issue.

> subtractByKey modifes values in the source RDD
> ----------------------------------------------
>
>                 Key: SPARK-18565
>                 URL: https://issues.apache.org/jira/browse/SPARK-18565
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.2
>         Environment: Amazon Elastic MapReduce (emr-5.2.0)
>            Reporter: Dmitry Dzhus
>
> I'm experiencing a problem with subtractByKey using Spark 2.0.2 with Scala 
> 2.11.x:
> Relevant code:
> {code}
> object Types {
>    type ContentId = Int
>    type ContentKey = Tuple2[Int, ContentId]
>    type InternalContentId = Int
> }
> val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] = 
> itemIDMap.map(_.swap).cache()
> logger.info(s"Built an inverse map of ${inverseItemIDMap.count()} item IDs")
> logger.info(inverseItemIDMap.collect().mkString("I->E ", "\nI->E ", ""))
> val superfluousItems: RDD[(InternalContentId, Int)] = .. .cache()
> logger.info(superfluousItems.collect().mkString("SI ", "\nSI ", ""))
> val filteredInverseItemIDMap: RDD[(InternalContentId, ContentKey)] = 
>   inverseItemIDMap.subtractByKey(superfluousItems).cache() // <<===!!!
> logger.info(s"${filteredInverseItemIDMap.count()} items in the filtered 
> inverse ID mapping")
> logger.info(filteredInverseItemIDMap.collect().mkString("F I->E ", "\nF I->E 
> ", ""))
> {code}
> The operation in question is {{.subtractByKey}}. Both RDDs involved are 
> cached and forced via {{count()}} prior to calling {{subtractByKey}}, so I 
> would expect the result to be unaffected by how exactly superfluousItems is 
> built.
> I added debugging output and filtered the resulting logs by relevant 
> InternalContentId values (829911, 830071). Output:
> {code}
> Built an inverse map of 827354 item IDs
> .
> .
> I->E (829911,(2,1135081))
> I->E (830071,(1,2295102))
> .
> .
> 748190 items in the training set had less than 28 ratings
> SI (829911,3)
> .
> .
> 79164 items in the filtered inverse ID mapping
> F I->E (830071,(2,1135081))
> {code}
> There's no element with key 830071 in {{superfluousItems}} (SI), so it's not 
> removed from the source RDD. However, its value is for some reason replaced 
> with the one from key 829911. How could this be? I cannot reproduce it 
> locally - only when running on a multi-machine cluster. Is this a bug or I'm 
> missing something?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to