Repository: spark Updated Branches: refs/heads/master c9e05ca27 -> dea302ddb
SPARK-2621. Update task InputMetrics incrementally The patch takes advantage an API provided in Hadoop 2.5 that allows getting accurate data on Hadoop FileSystem bytes read. It eliminates the old method, which naively accepts the split size as the input bytes. An impact of this change will be that input metrics go away when using against Hadoop versions earlier thatn 2.5. I can add this back in, but my opinion is that no metrics are better than inaccurate metrics. This is difficult to write a test for because we don't usually build against a version of Hadoop that contains the function we need. I've tested it manually on a pseudo-distributed cluster. Author: Sandy Ryza <[email protected]> Closes #2087 from sryza/sandy-spark-2621 and squashes the following commits: 23010b8 [Sandy Ryza] Missing style fixes 74fc9bb [Sandy Ryza] Make getFSBytesReadOnThreadCallback private 1ab662d [Sandy Ryza] Clear things up a bit 984631f [Sandy Ryza] Switch from pull to push model and add test 7ef7b22 [Sandy Ryza] Add missing curly braces 219abc9 [Sandy Ryza] Fall back to split size 90dbc14 [Sandy Ryza] SPARK-2621. Update task InputMetrics incrementally Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dea302dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dea302dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dea302dd Branch: refs/heads/master Commit: dea302ddbd26b1f20fb8a3979bd1d8e1717479f8 Parents: c9e05ca Author: Sandy Ryza <[email protected]> Authored: Mon Oct 27 10:04:24 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Mon Oct 27 10:04:24 2014 -0700 ---------------------------------------------------------------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 30 +++++++++++ .../org/apache/spark/executor/TaskMetrics.scala | 1 - .../scala/org/apache/spark/rdd/HadoopRDD.scala | 48 ++++++++++++++---- .../org/apache/spark/rdd/NewHadoopRDD.scala | 48 ++++++++++++++---- .../scala/org/apache/spark/util/Utils.scala | 11 ++++ .../spark/metrics/InputMetricsSuite.scala | 53 ++++++++++++++++++++ 6 files changed, 170 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index fe0ad9e..e28eaad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -20,12 +20,15 @@ package org.apache.spark.deploy import java.security.PrivilegedExceptionAction import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem.Statistics import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.Utils import scala.collection.JavaConversions._ @@ -121,6 +124,33 @@ class SparkHadoopUtil extends Logging { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } + /** + * Returns a function that can be called to find Hadoop FileSystem bytes read. If + * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will + * return the bytes read on r since t. Reflection is required because thread-level FileSystem + * statistics are only available as of Hadoop 2.5 (see HADOOP-10688). + * Returns None if the required method can't be found. + */ + private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration) + : Option[() => Long] = { + val qualifiedPath = path.getFileSystem(conf).makeQualified(path) + val scheme = qualifiedPath.toUri().getScheme() + val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme)) + try { + val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + val statisticsDataClass = + Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData") + val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead") + val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum + val baselineBytesRead = f() + Some(() => f() - baselineBytesRead) + } catch { + case e: NoSuchMethodException => { + logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e) + None + } + } + } } object SparkHadoopUtil { http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 3e49b62..57bc2b4 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -169,7 +169,6 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { var bytesRead: Long = 0L } - /** * :: DeveloperApi :: * Metrics pertaining to shuffle data read in a given task. http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 7751417..946fb56 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -46,7 +46,6 @@ import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.util.{NextIterator, Utils} import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation} - /** * A Spark split class that wraps around a Hadoop InputSplit. */ @@ -224,18 +223,18 @@ class HadoopRDD[K, V]( val key: K = reader.createKey() val value: V = reader.createValue() - // Set the task input metrics. val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - try { - /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't - * always at record boundaries, so tasks may need to read into other splits to complete - * a record. */ - inputMetrics.bytesRead = split.inputSplit.value.getLength() - } catch { - case e: java.io.IOException => - logWarning("Unable to get input size to set InputMetrics for task", e) + // Find a function that will return the FileSystem bytes read by this thread. + val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) { + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( + split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf) + } else { + None + } + if (bytesReadCallback.isDefined) { + context.taskMetrics.inputMetrics = Some(inputMetrics) } - context.taskMetrics.inputMetrics = Some(inputMetrics) + var recordsSinceMetricsUpdate = 0 override def getNext() = { try { @@ -244,12 +243,36 @@ class HadoopRDD[K, V]( case eof: EOFException => finished = true } + + // Update bytes read metric every few records + if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES + && bytesReadCallback.isDefined) { + recordsSinceMetricsUpdate = 0 + val bytesReadFn = bytesReadCallback.get + inputMetrics.bytesRead = bytesReadFn() + } else { + recordsSinceMetricsUpdate += 1 + } (key, value) } override def close() { try { reader.close() + if (bytesReadCallback.isDefined) { + val bytesReadFn = bytesReadCallback.get + inputMetrics.bytesRead = bytesReadFn() + } else if (split.inputSplit.value.isInstanceOf[FileSplit]) { + // If we can't get the bytes read from the FS stats, fall back to the split size, + // which may be inaccurate. + try { + inputMetrics.bytesRead = split.inputSplit.value.getLength + context.taskMetrics.inputMetrics = Some(inputMetrics) + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + } } catch { case e: Exception => { if (!Utils.inShutdown()) { @@ -302,6 +325,9 @@ private[spark] object HadoopRDD extends Logging { */ val CONFIGURATION_INSTANTIATION_LOCK = new Object() + /** Update the input bytes read metric each time this number of records has been read */ + val RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES = 256 + /** * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 0cccdef..3245632 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -25,6 +25,7 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.spark.annotation.DeveloperApi import org.apache.spark.input.WholeTextFileInputFormat @@ -36,6 +37,7 @@ import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkHadoopUtil private[spark] class NewHadoopPartition( rddId: Int, @@ -118,21 +120,22 @@ class NewHadoopRDD[K, V]( reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - try { - /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't - * always at record boundaries, so tasks may need to read into other splits to complete - * a record. */ - inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength() - } catch { - case e: Exception => - logWarning("Unable to get input split size in order to set task input bytes", e) + // Find a function that will return the FileSystem bytes read by this thread. + val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback( + split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf) + } else { + None + } + if (bytesReadCallback.isDefined) { + context.taskMetrics.inputMetrics = Some(inputMetrics) } - context.taskMetrics.inputMetrics = Some(inputMetrics) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => close()) var havePair = false var finished = false + var recordsSinceMetricsUpdate = 0 override def hasNext: Boolean = { if (!finished && !havePair) { @@ -147,12 +150,39 @@ class NewHadoopRDD[K, V]( throw new java.util.NoSuchElementException("End of stream") } havePair = false + + // Update bytes read metric every few records + if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES + && bytesReadCallback.isDefined) { + recordsSinceMetricsUpdate = 0 + val bytesReadFn = bytesReadCallback.get + inputMetrics.bytesRead = bytesReadFn() + } else { + recordsSinceMetricsUpdate += 1 + } + (reader.getCurrentKey, reader.getCurrentValue) } private def close() { try { reader.close() + + // Update metrics with final amount + if (bytesReadCallback.isDefined) { + val bytesReadFn = bytesReadCallback.get + inputMetrics.bytesRead = bytesReadFn() + } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { + // If we can't get the bytes read from the FS stats, fall back to the split size, + // which may be inaccurate. + try { + inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength + context.taskMetrics.inputMetrics = Some(inputMetrics) + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + } } catch { case e: Exception => { if (!Utils.inShutdown()) { http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 84ed5db..93ac9f1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1673,6 +1673,17 @@ private[spark] object Utils extends Logging { PropertyConfigurator.configure(pro) } + def invoke( + clazz: Class[_], + obj: AnyRef, + methodName: String, + args: (Class[_], AnyRef)*): AnyRef = { + val (types, values) = args.unzip + val method = clazz.getDeclaredMethod(methodName, types: _*) + method.setAccessible(true) + method.invoke(obj, values.toSeq: _*) + } + } /** http://git-wip-us.apache.org/repos/asf/spark/blob/dea302dd/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala new file mode 100644 index 0000000..33bd1af --- /dev/null +++ b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics + +import org.scalatest.FunSuite + +import org.apache.spark.SharedSparkContext +import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} + +import scala.collection.mutable.ArrayBuffer + +import java.io.{FileWriter, PrintWriter, File} + +class InputMetricsSuite extends FunSuite with SharedSparkContext { + test("input metrics when reading text file") { + val file = new File(getClass.getSimpleName + ".txt") + val pw = new PrintWriter(new FileWriter(file)) + pw.println("some stuff") + pw.println("some other stuff") + pw.println("yet more stuff") + pw.println("too much stuff") + pw.close() + file.deleteOnExit() + + val taskBytesRead = new ArrayBuffer[Long]() + sc.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead + } + }) + sc.textFile("file://" + file.getAbsolutePath, 2).count() + + // Wait for task end events to come in + sc.listenerBus.waitUntilEmpty(500) + assert(taskBytesRead.length == 2) + assert(taskBytesRead.sum == file.length()) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
