Josh Rosen created SPARK-8086:
---------------------------------
Summary: InputOutputMetricsSuite should not call side-effecting
getFSBytesWrittenOnThreadCallback to detect whether we're running on Hadoop 2.5+
Key: SPARK-8086
URL: https://issues.apache.org/jira/browse/SPARK-8086
Project: Spark
Issue Type: Bug
Components: Spark Core
Reporter: Josh Rosen
Priority: Critical
(This JIRA is a spinoff from SPARK-8062)
While working to try to reproduce SPARK-8062 I noticed something rather curious
in {{InputOutputMetricsSuite}}: the output metrics tests are guarded by {{if}}
statements that check whether the bytesWrittenOnThreadCallback is defined:
{code}
test("output metrics when writing text file") {
val fs = FileSystem.getLocal(new Configuration())
val outPath = new Path(fs.getWorkingDirectory, "outdir")
if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath,
fs.getConf).isDefined) {
// ... Body of test case ...
}
}
{code}
AFAIK this test was introduced in order to prevent this test's assertions from
failing under pre-Hadoop-2.5 versions of Hadoop.
Now, take a look at the regression test that I added to try to reproduce this
bug:
{code}
test("exceptions while getting IO thread statistics should not fail tasks /
jobs (SPARK-8062)") {
FileSystem.getStatistics(null, classOf[FileSystem])
val fs = FileSystem.getLocal(new Configuration())
val outPath = new Path(fs.getWorkingDirectory, "outdir")
// This test passes unless the following line is commented out. The
following line therefore
// has some side-effects that are impacting the system under test:
SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath,
fs.getConf).isDefined
val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
try {
rdd.saveAsTextFile(outPath.toString)
} finally {
fs.delete(outPath, true)
}
}
{code}
In this test, I try to pollute the global FileSystem statistics registry by
storing a statistics entry for a filesystem with a null URI. For this test,
all I care about is Spark not crashing, so I didn't add the {{if}} check (I
don't need to worry about the assertions failing on pre-Hadoop-2.5 versions
here since there aren't any assertions that check the metrics for this test).
Surprisingly, though, my test was unable to fail until I added a
{code}
SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath,
fs.getConf).isDefined
{code}
check outside of an {{if}} statement. This implies that this method has side
effects which influence whether other metrics retrieval code is called. I
worry that this may imply that our other InputOutputMetrics code could be
broken for real production jobs. I'd like to investigate this and fix this
issue, while also hardening this code: I think that we should be performing
significantly more null checks for the input and output of Hadoop methods and
should be using a pure function to determine whether our Hadoop version
supports these metrics rather than calling a method that might have
side-effects (I think we can do this purely via reflection without actually
creating any objects / calling any methods).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]