[
https://issues.apache.org/jira/browse/SPARK-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739077#comment-14739077
]
Erik Schmiegelow commented on SPARK-10536:
------------------------------------------
We were discussing the implication of InputFormat as a possible reason - thanks
for confirming. I do agree that the use case of long living object is unusual -
this program is a debugging tool - and therefore it doesn't really make sense
to clone and override the default behaviour - besides not having a decent copy
facility for non case classes in Scala.
I was surprised that the behaviour persisted despite packing the object in
Tuples, but I suppose Spark decomposes those automatically.
datum() is a method in AvroKey which returns visitor, btw - working with the
deserialized JSON did the trick in this case.
> filtered POJOs replaced by other instances after collect()
> ----------------------------------------------------------
>
> Key: SPARK-10536
> URL: https://issues.apache.org/jira/browse/SPARK-10536
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.4.1
> Reporter: Erik Schmiegelow
>
> I've encountered a very strange phenomenon with collect() in a simplistic
> program written for debugging purposes.
> The objective of the program is to filter objects which match an id and print
> their contents to stderr so that we can have a look at the contents. Our
> initial plan was to have the driver do that, because we run our applications
> in a YARN cluster and we didn't want to have to look for the executor
> instance first before looking at the log files.
> We then discovered that the results after collect didn't match the ids for
> which we had filtered, so we added a few debugging statements to find out
> what happened. Interestingly enough, we get the correct instances when we
> look at the instances with filter() or map() - on the executor. Once the
> instances are sent back to the driver, the instances are swapped. More
> intriguingly, we always get the same set of incorrect instances.
> Here' s the code:
> {code}
> val rdd = sparkContext.newAPIHadoopFile(
> input, classOf[AvroKeyInputFormat[Visitor]], classOf[AvroKey[Visitor]],
> classOf[NullWritable]).map(
> f => f._1.datum()
> ).filter(visitor => {
> val result = visitor.eid == eid
> if (result) {
> println(s"Found match ${visitor.eid} for $eid with hash
> ${visitor.hashCode()}")
> val mapper = new ObjectMapper()
>
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(visitor))
> }
> result
> }).map(f => {
> val mapper = new ObjectMapper()
> println(s"Map Output of visitor ${f.eid} for $eid with hash
> ${f.hashCode()}")
>
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
> f
> }).collect().foreach(f => {
> val mapper = new ObjectMapper()
> println(s"Collect Output of visitor ${f.eid} for $eid with hash
> ${f.hashCode()}")
>
> System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
> f})
> {code}
> The output we get in the Executor (filter + map) is as follows:
> {code}
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
> Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
> 1105567550
> {code}
> The output on the driver is this:
> {code}
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> Collect Output of visitor 4143521127100013504 for 4143922947700659073 with
> hash 1405504607
> {code}
> Some notes on the input:
> - we use reflective Avro to serialize to HDFS
> - we've got about 5 GB of data
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]