Dmitry Dzhus created SPARK-18565:
------------------------------------

             Summary: subtractByKey modifes values in the source RDD
                 Key: SPARK-18565
                 URL: https://issues.apache.org/jira/browse/SPARK-18565
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.0.2
         Environment: Amazon Elastic MapReduce (emr-5.2.0)
            Reporter: Dmitry Dzhus


I'm experiencing a problem with subtractByKey using Spark 2.0.2 with Scala 
2.11.x:

Relevant code:

{code}
object Types {
   type ContentId = Int
   type ContentKey = Tuple2[Int, ContentId]
   type InternalContentId = Int
}

val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] = 
itemIDMap.map(_.swap).cache()
logger.info(s"Built an inverse map of ${inverseItemIDMap.count()} item IDs")
logger.info(inverseItemIDMap.collect().mkString("I->E ", "\nI->E ", ""))

val superfluousItems: RDD[(InternalContentId, Int)] = .. .cache()
logger.info(superfluousItems.collect().mkString("SI ", "\nSI ", ""))

val filteredInverseItemIDMap: RDD[(InternalContentId, ContentKey)] = 
  inverseItemIDMap.subtractByKey(superfluousItems).cache() // <<===!!!
logger.info(s"${filteredInverseItemIDMap.count()} items in the filtered inverse 
ID mapping")
logger.info(filteredInverseItemIDMap.collect().mkString("F I->E ", "\nF I->E ", 
""))
{code}

The operation in question is {{.subtractByKey}}. Both RDDs involved are cached 
and forced via {{count()}} prior to calling {{subtractByKey}}, so I would 
expect the result to be unaffected by how exactly superfluousItems is built.

I added debugging output and filtered the resulting logs by relevant 
InternalContentId values (829911, 830071). Output:

{code}
Built an inverse map of 827354 item IDs
.
.
I->E (829911,(2,1135081))
I->E (830071,(1,2295102))
.
.
748190 items in the training set had less than 28 ratings
SI (829911,3)
.
.
79164 items in the filtered inverse ID mapping
F I->E (830071,(2,1135081))
{code}

There's no element with key 830071 in {{superfluousItems}} (SI), so it's not 
removed from the source RDD. However, its value is for some reason replaced 
with the one from key 829911. How could this be? I cannot reproduce it locally 
- only when running on a multi-machine cluster. Is this a bug or I'm missing 
something?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to