[
https://issues.apache.org/jira/browse/SPARK-8086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Josh Rosen resolved SPARK-8086.
-------------------------------
Resolution: Invalid
I'm closing this as "Invalid" for now, since it turns out that I wasn't paying
close enough attention to where the exception was being thrown; it turns out
that `saveAsTextFile` wasn't actually throwing here, which explains why the
addition of that method affected things. Sorry for being sloppy :(
> InputOutputMetricsSuite should not call side-effecting
> getFSBytesWrittenOnThreadCallback to detect whether we're running on Hadoop
> 2.5+
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-8086
> URL: https://issues.apache.org/jira/browse/SPARK-8086
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Reporter: Josh Rosen
> Priority: Critical
>
> (This JIRA is a spinoff from SPARK-8062)
> While working to try to reproduce SPARK-8062 I noticed something rather
> curious in {{InputOutputMetricsSuite}}: the output metrics tests are guarded
> by {{if}} statements that check whether the bytesWrittenOnThreadCallback is
> defined:
> {code}
> test("output metrics when writing text file") {
> val fs = FileSystem.getLocal(new Configuration())
> val outPath = new Path(fs.getWorkingDirectory, "outdir")
> if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath,
> fs.getConf).isDefined) {
> // ... Body of test case ...
> }
> }
> {code}
> AFAIK this test was introduced in order to prevent this test's assertions
> from failing under pre-Hadoop-2.5 versions of Hadoop.
> Now, take a look at the regression test that I added to try to reproduce this
> bug:
> {code}
> test("exceptions while getting IO thread statistics should not fail tasks /
> jobs (SPARK-8062)") {
> FileSystem.getStatistics(null, classOf[FileSystem])
> val fs = FileSystem.getLocal(new Configuration())
> val outPath = new Path(fs.getWorkingDirectory, "outdir")
> // This test passes unless the following line is commented out. The
> following line therefore
> // has some side-effects that are impacting the system under test:
> SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath,
> fs.getConf).isDefined
> val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
> try {
> rdd.saveAsTextFile(outPath.toString)
> } finally {
> fs.delete(outPath, true)
> }
> }
> {code}
> In this test, I try to pollute the global FileSystem statistics registry by
> storing a statistics entry for a filesystem with a null URI. For this test,
> all I care about is Spark not crashing, so I didn't add the {{if}} check (I
> don't need to worry about the assertions failing on pre-Hadoop-2.5 versions
> here since there aren't any assertions that check the metrics for this test).
> Surprisingly, though, my test was unable to fail until I added a
> {code}
> SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath,
> fs.getConf).isDefined
> {code}
> check outside of an {{if}} statement. This implies that this method has side
> effects which influence whether other metrics retrieval code is called. I
> worry that this may imply that our other InputOutputMetrics code could be
> broken for real production jobs. I'd like to investigate this and fix this
> issue, while also hardening this code: I think that we should be performing
> significantly more null checks for the input and output of Hadoop methods and
> should be using a pure function to determine whether our Hadoop version
> supports these metrics rather than calling a method that might have
> side-effects (I think we can do this purely via reflection without actually
> creating any objects / calling any methods).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]