Erik Schmiegelow created SPARK-10536:
----------------------------------------
Summary: filtered POJOs replaced by other instances after collect()
Key: SPARK-10536
URL: https://issues.apache.org/jira/browse/SPARK-10536
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 1.4.1
Reporter: Erik Schmiegelow
I've encountered a very strange phenomenon with collect() in a simplistic
program written for debugging purposes.
The objective of the program is to filter objects which match an id and print
their contents to stderr so that we can have a look at the contents. Our
initial plan was to have the driver do that, because we run our applications in
a YARN cluster and we didn't want to have to look for the executor instance
first before looking at the log files.
We then discovered that the results after collect didn't match the ids for
which we had filtered, so we added a few debugging statements to find out what
happened. Interestingly enough, we get the correct instances when we look at
the instances with filter() or map() - on the executor. Once the instances are
sent back to the driver, the instances are swapped. More intriguingly, we
always get the same set of incorrect instances.
Here' s the code:
val rdd = sparkContext.newAPIHadoopFile(
input, classOf[AvroKeyInputFormat[Visitor]], classOf[AvroKey[Visitor]],
classOf[NullWritable]).map(
f => f._1.datum()
).filter(visitor => {
val result = visitor.eid == eid
if (result) {
println(s"Found match ${visitor.eid} for $eid with hash
${visitor.hashCode()}")
val mapper = new ObjectMapper()
System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(visitor))
}
result
}).map(f => {
val mapper = new ObjectMapper()
println(s"Map Output of visitor ${f.eid} for $eid with hash
${f.hashCode()}")
System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
f
}).collect().foreach(f => {
val mapper = new ObjectMapper()
println(s"Collect Output of visitor ${f.eid} for $eid with hash
${f.hashCode()}")
System.err.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(f))
f})
The output we get in the Executor (filter + map) is as follows:
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
Found match 4143922947700659073 for 4143922947700659073 with hash 1105567550
Map Output of visitor 4143922947700659073 for 4143922947700659073 with hash
1105567550
The output on the driver is this:
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Collect Output of visitor 4143521127100013504 for 4143922947700659073 with hash
1405504607
Some notes on the input:
- we use reflective Avro to serialize to HDFS
- we've got about 5 GB of data
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]