[SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated APIs and reflection that supported 1.x
Remove use of deprecated Hadoop APIs now that 2.2+ is required Author: Sean Owen <so...@cloudera.com> Closes #10446 from srowen/SPARK-12481. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15bd7362 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15bd7362 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15bd7362 Branch: refs/heads/master Commit: 15bd73627e04591fd13667b4838c9098342db965 Parents: 94f7a12 Author: Sean Owen <so...@cloudera.com> Authored: Sat Jan 2 13:15:53 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Sat Jan 2 13:15:53 2016 +0000 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 16 ++--- .../org/apache/spark/SparkHadoopWriter.scala | 20 +++--- .../apache/spark/deploy/SparkHadoopUtil.scala | 41 ++---------- .../deploy/history/FsHistoryProvider.scala | 16 ++--- .../input/FixedLengthBinaryInputFormat.scala | 3 +- .../input/FixedLengthBinaryRecordReader.scala | 7 +- .../apache/spark/input/PortableDataStream.scala | 7 +- .../spark/input/WholeTextFileInputFormat.scala | 2 +- .../spark/input/WholeTextFileRecordReader.scala | 5 +- .../spark/mapred/SparkHadoopMapRedUtil.scala | 51 +------------- .../mapreduce/SparkHadoopMapReduceUtil.scala | 68 ------------------- .../org/apache/spark/rdd/BinaryFileRDD.scala | 4 +- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 3 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 12 ++-- .../org/apache/spark/rdd/PairRDDFunctions.scala | 26 +++----- .../spark/rdd/ReliableCheckpointRDD.scala | 3 +- .../org/apache/spark/rdd/WholeTextFileRDD.scala | 3 +- .../spark/scheduler/EventLoggingListener.scala | 12 +--- .../spark/scheduler/InputFormatInfo.scala | 2 +- .../apache/spark/util/ShutdownHookManager.scala | 19 +----- .../java/org/apache/spark/JavaAPISuite.java | 2 +- .../test/scala/org/apache/spark/FileSuite.scala | 9 ++- .../scheduler/EventLoggingListenerSuite.scala | 4 +- .../spark/scheduler/ReplayListenerSuite.scala | 3 +- .../spark/examples/CassandraCQLTest.scala | 4 +- .../apache/spark/examples/CassandraTest.scala | 4 +- project/MimaExcludes.scala | 5 ++ scalastyle-config.xml | 8 --- .../InsertIntoHadoopFsRelation.scala | 2 +- .../execution/datasources/SqlNewHadoopRDD.scala | 18 +++-- .../execution/datasources/WriterContainer.scala | 27 +++----- .../datasources/json/JSONRelation.scala | 12 ++-- .../parquet/CatalystReadSupport.scala | 2 - .../parquet/DirectParquetOutputCommitter.scala | 6 +- .../datasources/parquet/ParquetRelation.scala | 20 +++--- .../datasources/text/DefaultSource.scala | 13 ++-- .../apache/spark/sql/sources/interfaces.scala | 8 +-- .../org/apache/spark/sql/hive/HiveContext.scala | 2 +- .../spark/sql/hive/client/ClientWrapper.scala | 70 -------------------- .../spark/sql/hive/hiveWriterContainers.scala | 13 ++-- .../spark/sql/hive/orc/OrcFileOperator.scala | 2 +- .../apache/spark/sql/hive/orc/OrcRelation.scala | 16 ++--- .../spark/sql/sources/SimpleTextRelation.scala | 5 +- .../streaming/util/FileBasedWriteAheadLog.scala | 3 +- .../util/FileBasedWriteAheadLogWriter.scala | 10 +-- .../streaming/util/WriteAheadLogSuite.scala | 3 +- 46 files changed, 150 insertions(+), 441 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bbdc915..77e44ee 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -874,11 +874,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope { assertNotStopped() - val job = new NewHadoopJob(hadoopConfiguration) + val job = NewHadoopJob.getInstance(hadoopConfiguration) // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) NewFileInputFormat.setInputPaths(job, path) - val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val updateConf = job.getConfiguration new WholeTextFileRDD( this, classOf[WholeTextFileInputFormat], @@ -923,11 +923,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope { assertNotStopped() - val job = new NewHadoopJob(hadoopConfiguration) + val job = NewHadoopJob.getInstance(hadoopConfiguration) // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) NewFileInputFormat.setInputPaths(job, path) - val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val updateConf = job.getConfiguration new BinaryFileRDD( this, classOf[StreamInputFormat], @@ -1100,13 +1100,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope { assertNotStopped() - // The call to new NewHadoopJob automatically adds security credentials to conf, + // The call to NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves - val job = new NewHadoopJob(conf) + val job = NewHadoopJob.getInstance(conf) // Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) NewFileInputFormat.setInputPaths(job, path) - val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val updatedConf = job.getConfiguration new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path) } @@ -1369,7 +1369,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli if (!fs.exists(hadoopPath)) { throw new FileNotFoundException(s"Added file $hadoopPath does not exist.") } - val isDir = fs.getFileStatus(hadoopPath).isDir + val isDir = fs.getFileStatus(hadoopPath).isDirectory if (!isLocal && scheme == "file" && isDir) { throw new SparkException(s"addFile does not support local directories when not running " + "local mode.") http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index ac6eaab..dd400b8 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -25,6 +25,7 @@ import java.util.Date import org.apache.hadoop.mapred._ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskType import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.HadoopRDD @@ -37,10 +38,7 @@ import org.apache.spark.util.SerializableJobConf * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ private[spark] -class SparkHadoopWriter(jobConf: JobConf) - extends Logging - with SparkHadoopMapRedUtil - with Serializable { +class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { private val now = new Date() private val conf = new SerializableJobConf(jobConf) @@ -131,7 +129,7 @@ class SparkHadoopWriter(jobConf: JobConf) private def getJobContext(): JobContext = { if (jobContext == null) { - jobContext = newJobContext(conf.value, jID.value) + jobContext = new JobContextImpl(conf.value, jID.value) } jobContext } @@ -143,6 +141,12 @@ class SparkHadoopWriter(jobConf: JobConf) taskContext } + protected def newTaskAttemptContext( + conf: JobConf, + attemptId: TaskAttemptID): TaskAttemptContext = { + new TaskAttemptContextImpl(conf, attemptId) + } + private def setIDs(jobid: Int, splitid: Int, attemptid: Int) { jobID = jobid splitID = splitid @@ -150,7 +154,7 @@ class SparkHadoopWriter(jobConf: JobConf) jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid)) taID = new SerializableWritable[TaskAttemptID]( - new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) + new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID)) } } @@ -168,9 +172,9 @@ object SparkHadoopWriter { } val outputPath = new Path(path) val fs = outputPath.getFileSystem(conf) - if (outputPath == null || fs == null) { + if (fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") } - outputPath.makeQualified(fs) + outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) } } http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 59e9056..4bd94f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -33,9 +33,6 @@ import org.apache.hadoop.fs.FileSystem.Statistics import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.JobContext -import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext} -import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID} import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.annotation.DeveloperApi @@ -76,9 +73,6 @@ class SparkHadoopUtil extends Logging { } } - @deprecated("use newConfiguration with SparkConf argument", "1.2.0") - def newConfiguration(): Configuration = newConfiguration(null) - /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop * subsystems. @@ -191,33 +185,6 @@ class SparkHadoopUtil extends Logging { } /** - * Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly - * call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes - * for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+ - * while it's interface in Hadoop 2.+. - */ - def getConfigurationFromJobContext(context: JobContext): Configuration = { - // scalastyle:off jobconfig - val method = context.getClass.getMethod("getConfiguration") - // scalastyle:on jobconfig - method.invoke(context).asInstanceOf[Configuration] - } - - /** - * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly - * call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes - * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+ - * while it's interface in Hadoop 2.+. - */ - def getTaskAttemptIDFromTaskAttemptContext( - context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = { - // scalastyle:off jobconfig - val method = context.getClass.getMethod("getTaskAttemptID") - // scalastyle:on jobconfig - method.invoke(context).asInstanceOf[MapReduceTaskAttemptID] - } - - /** * Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the * given path points to a file, return a single-element collection containing [[FileStatus]] of * that file. @@ -233,11 +200,11 @@ class SparkHadoopUtil extends Logging { */ def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = { def recurse(status: FileStatus): Seq[FileStatus] = { - val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir) + val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory) leaves ++ directories.flatMap(f => listLeafStatuses(fs, f)) } - if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus) + if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus) } def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = { @@ -246,12 +213,12 @@ class SparkHadoopUtil extends Logging { def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = { def recurse(status: FileStatus): Seq[FileStatus] = { - val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir) + val (directories, files) = fs.listStatus(status.getPath).partition(_.isDirectory) val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus] leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir)) } - assert(baseStatus.isDir) + assert(baseStatus.isDirectory) recurse(baseStatus) } http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 6e91d73..c93bc8c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.hadoop.security.AccessControlException import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} @@ -167,7 +168,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } throw new IllegalArgumentException(msg) } - if (!fs.getFileStatus(path).isDir) { + if (!fs.getFileStatus(path).isDirectory) { throw new IllegalArgumentException( "Logging directory specified is not a directory: %s".format(logDir)) } @@ -304,7 +305,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logError("Exception encountered when attempting to update last scan time", e) lastScanTime } finally { - if (!fs.delete(path)) { + if (!fs.delete(path, true)) { logWarning(s"Error deleting ${path}") } } @@ -603,7 +604,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * As of Spark 1.3, these files are consolidated into a single one that replaces the directory. * See SPARK-2261 for more detail. */ - private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir() + private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory /** * Returns the modification time of the given event log. If the status points at an empty @@ -648,8 +649,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2, - * so we have to resort to ugly reflection (as usual...). + * Checks whether HDFS is in safe mode. * * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons * makes it more public than not. @@ -663,11 +663,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // For testing. private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = { - val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction" - val actionClass: Class[_] = getClass().getClassLoader().loadClass(hadoop2Class) - val action = actionClass.getField("SAFEMODE_GET").get(null) - val method = dfs.getClass().getMethod("setSafeMode", action.getClass()) - method.invoke(dfs, action).asInstanceOf[Boolean] + dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET) } } http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala index 532850d..30431a9 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala @@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} import org.apache.spark.Logging -import org.apache.spark.deploy.SparkHadoopUtil /** * Custom Input Format for reading and splitting flat binary files that contain records, @@ -36,7 +35,7 @@ private[spark] object FixedLengthBinaryInputFormat { /** Retrieves the record length property from a Hadoop configuration */ def getRecordLength(context: JobContext): Int = { - SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt + context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt } } http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala index 67a9692..25596a1 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala @@ -24,7 +24,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.io.{BytesWritable, LongWritable} import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.FileSplit -import org.apache.spark.deploy.SparkHadoopUtil /** * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat. @@ -83,16 +82,16 @@ private[spark] class FixedLengthBinaryRecordReader // the actual file we will be reading from val file = fileSplit.getPath // job configuration - val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context) + val conf = context.getConfiguration // check compression - val codec = new CompressionCodecFactory(job).getCodec(file) + val codec = new CompressionCodecFactory(conf).getCodec(file) if (codec != null) { throw new IOException("FixedLengthRecordReader does not support reading compressed files") } // get the record length recordLength = FixedLengthBinaryInputFormat.getRecordLength(context) // get the filesystem - val fs = file.getFileSystem(job) + val fs = file.getFileSystem(conf) // open the File fileInputStream = fs.open(file) // seek to the splitStart position http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 280e7a5..cb76e3c 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit} -import org.apache.spark.deploy.SparkHadoopUtil - /** * A general format for reading whole files in as streams, byte arrays, * or other functions to be added @@ -44,7 +42,7 @@ private[spark] abstract class StreamFileInputFormat[T] */ def setMinPartitions(context: JobContext, minPartitions: Int) { val files = listStatus(context).asScala - val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum + val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong super.setMaxSplitSize(maxSplitSize) } @@ -135,8 +133,7 @@ class PortableDataStream( private val confBytes = { val baos = new ByteArrayOutputStream() - SparkHadoopUtil.get.getConfigurationFromJobContext(context). - write(new DataOutputStream(baos)) + context.getConfiguration.write(new DataOutputStream(baos)) baos.toByteArray } http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 4134087..fa34f1e 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -53,7 +53,7 @@ private[spark] class WholeTextFileInputFormat */ def setMinPartitions(context: JobContext, minPartitions: Int) { val files = listStatus(context).asScala - val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum + val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong super.setMaxSplitSize(maxSplitSize) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index b56b2aa..998c898 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader} import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.spark.deploy.SparkHadoopUtil - /** * A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface. @@ -52,8 +50,7 @@ private[spark] class WholeTextFileRecordReader( extends RecordReader[Text, Text] with Configurable { private[this] val path = split.getPath(index) - private[this] val fs = path.getFileSystem( - SparkHadoopUtil.get.getConfigurationFromJobContext(context)) + private[this] val fs = path.getFileSystem(context.getConfiguration) // True means the current file has been processed, then skip it. private[this] var processed = false http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index f7298e8..249bdf5 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -18,61 +18,12 @@ package org.apache.spark.mapred import java.io.IOException -import java.lang.reflect.Modifier -import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext} import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.CommitDeniedException import org.apache.spark.{Logging, SparkEnv, TaskContext} -import org.apache.spark.util.{Utils => SparkUtils} - -private[spark] -trait SparkHadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = { - val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", - "org.apache.hadoop.mapred.JobContext") - val ctor = klass.getDeclaredConstructor(classOf[JobConf], - classOf[org.apache.hadoop.mapreduce.JobID]) - // In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private. - // Make it accessible if it's not in order to access it. - if (!Modifier.isPublic(ctor.getModifiers)) { - ctor.setAccessible(true) - } - ctor.newInstance(conf, jobId).asInstanceOf[JobContext] - } - - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = { - val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", - "org.apache.hadoop.mapred.TaskAttemptContext") - val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID]) - // See above - if (!Modifier.isPublic(ctor.getModifiers)) { - ctor.setAccessible(true) - } - ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] - } - - def newTaskAttemptID( - jtIdentifier: String, - jobId: Int, - isMap: Boolean, - taskId: Int, - attemptId: Int): TaskAttemptID = { - new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) - } - - private def firstAvailableClass(first: String, second: String): Class[_] = { - try { - SparkUtils.classForName(first) - } catch { - case e: ClassNotFoundException => - SparkUtils.classForName(second) - } - } -} object SparkHadoopMapRedUtil extends Logging { /** @@ -93,7 +44,7 @@ object SparkHadoopMapRedUtil extends Logging { jobId: Int, splitId: Int): Unit = { - val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext) + val mrTaskAttemptID = mrTaskContext.getTaskAttemptID // Called after we have decided to commit def performCommit(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala deleted file mode 100644 index 82d807f..0000000 --- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mapreduce - -import java.lang.{Boolean => JBoolean, Integer => JInteger} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID} -import org.apache.spark.util.Utils - -private[spark] -trait SparkHadoopMapReduceUtil { - def newJobContext(conf: Configuration, jobId: JobID): JobContext = { - val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.JobContextImpl") - val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID]) - ctor.newInstance(conf, jobId).asInstanceOf[JobContext] - } - - def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = { - val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl") - val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID]) - ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] - } - - def newTaskAttemptID( - jtIdentifier: String, - jobId: Int, - isMap: Boolean, - taskId: Int, - attemptId: Int): TaskAttemptID = { - val klass = Utils.classForName("org.apache.hadoop.mapreduce.TaskAttemptID") - try { - // First, attempt to use the old-style constructor that takes a boolean isMap - // (not available in YARN) - val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean], - classOf[Int], classOf[Int]) - ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), - new JInteger(attemptId)).asInstanceOf[TaskAttemptID] - } catch { - case exc: NoSuchMethodException => { - // If that failed, look for the new constructor that takes a TaskType (not available in 1.x) - val taskTypeClass = Utils.classForName("org.apache.hadoop.mapreduce.TaskType") - .asInstanceOf[Class[Enum[_]]] - val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke( - taskTypeClass, if (isMap) "MAP" else "REDUCE") - val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass, - classOf[Int], classOf[Int]) - ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), - new JInteger(attemptId)).asInstanceOf[TaskAttemptID] - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index aedced7..2bf2337 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -20,6 +20,8 @@ package org.apache.spark.rdd import org.apache.hadoop.conf.{ Configurable, Configuration } import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.JobContextImpl + import org.apache.spark.input.StreamFileInputFormat import org.apache.spark.{ Partition, SparkContext } @@ -40,7 +42,7 @@ private[spark] class BinaryFileRDD[T]( configurable.setConf(conf) case _ => } - val jobContext = newJobContext(conf, jobId) + val jobContext = new JobContextImpl(conf, jobId) inputFormat.setMinPartitions(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index f37c95b..920d3bf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.JobID import org.apache.hadoop.mapred.TaskAttemptID import org.apache.hadoop.mapred.TaskID import org.apache.hadoop.mapred.lib.CombineFileSplit +import org.apache.hadoop.mapreduce.TaskType import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ @@ -357,7 +358,7 @@ private[spark] object HadoopRDD extends Logging { def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int, conf: JobConf) { val jobID = new JobID(jobTrackerId, jobId) - val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId) + val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId) conf.set("mapred.tip.id", taId.getTaskID.toString) conf.set("mapred.task.id", taId.toString) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 86f38ae..8b330a3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -26,11 +26,11 @@ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit} +import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} import org.apache.spark.annotation.DeveloperApi import org.apache.spark._ import org.apache.spark.executor.DataReadMethod -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager} import org.apache.spark.deploy.SparkHadoopUtil @@ -66,9 +66,7 @@ class NewHadoopRDD[K, V]( keyClass: Class[K], valueClass: Class[V], @transient private val _conf: Configuration) - extends RDD[(K, V)](sc, Nil) - with SparkHadoopMapReduceUtil - with Logging { + extends RDD[(K, V)](sc, Nil) with Logging { // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf)) @@ -109,7 +107,7 @@ class NewHadoopRDD[K, V]( configurable.setConf(_conf) case _ => } - val jobContext = newJobContext(_conf, jobId) + val jobContext = new JobContextImpl(_conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { @@ -144,8 +142,8 @@ class NewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) - val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) + val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) private var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 44d1955..b872301 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -33,15 +33,14 @@ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, - RecordWriter => NewRecordWriter} +import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskType, TaskAttemptID} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{DataWriteMethod, OutputMetrics} -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -53,10 +52,7 @@ import org.apache.spark.util.random.StratifiedSamplingUtils */ class PairRDDFunctions[K, V](self: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) - extends Logging - with SparkHadoopMapReduceUtil - with Serializable -{ + extends Logging with Serializable { /** * :: Experimental :: @@ -985,11 +981,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf - val job = new NewAPIHadoopJob(hadoopConf) + val job = NewAPIHadoopJob.getInstance(hadoopConf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) job.setOutputFormatClass(outputFormatClass) - val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val jobConfiguration = job.getConfiguration jobConfiguration.set("mapred.output.dir", path) saveAsNewAPIHadoopDataset(jobConfiguration) } @@ -1074,11 +1070,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf - val job = new NewAPIHadoopJob(hadoopConf) + val job = NewAPIHadoopJob.getInstance(hadoopConf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id - val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val jobConfiguration = job.getConfiguration val wrappedConf = new SerializableConfiguration(jobConfiguration) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance @@ -1091,9 +1087,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => { val config = wrappedConf.value /* "reduce task" <split #> <attempt # = spark task #> */ - val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId, context.attemptNumber) - val hadoopContext = newTaskAttemptContext(config, attemptId) + val hadoopContext = new TaskAttemptContextImpl(config, attemptId) val format = outfmt.newInstance format match { case c: Configurable => c.setConf(config) @@ -1125,8 +1121,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) 1 } : Int - val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) - val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) + val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0) + val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) // When speculation is on and output committer class name contains "Direct", we should warn http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index fa71b8c..a9b3d52 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -174,7 +174,8 @@ private[spark] object ReliableCheckpointRDD extends Logging { fs.create(tempOutputPath, false, bufferSize) } else { // This is mainly for testing purpose - fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize) + fs.create(tempOutputPath, false, bufferSize, + fs.getDefaultReplication(fs.getWorkingDirectory), blockSize) } val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala index e3f14fe..8e1baae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.{Text, Writable} import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.spark.{Partition, SparkContext} import org.apache.spark.input.WholeTextFileInputFormat @@ -44,7 +45,7 @@ private[spark] class WholeTextFileRDD( configurable.setConf(conf) case _ => } - val jobContext = newJobContext(conf, jobId) + val jobContext = new JobContextImpl(conf, jobId) inputFormat.setMinPartitions(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index eaa07ac..68792c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -77,14 +77,6 @@ private[spark] class EventLoggingListener( // Only defined if the file system scheme is not local private var hadoopDataStream: Option[FSDataOutputStream] = None - // The Hadoop APIs have changed over time, so we use reflection to figure out - // the correct method to use to flush a hadoop data stream. See SPARK-1518 - // for details. - private val hadoopFlushMethod = { - val cls = classOf[FSDataOutputStream] - scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync")) - } - private var writer: Option[PrintWriter] = None // For testing. Keep track of all JSON serialized events that have been logged. @@ -97,7 +89,7 @@ private[spark] class EventLoggingListener( * Creates the log file in the configured log directory. */ def start() { - if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDir) { + if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.") } @@ -147,7 +139,7 @@ private[spark] class EventLoggingListener( // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) - hadoopDataStream.foreach(hadoopFlushMethod.invoke(_)) + hadoopDataStream.foreach(_.hflush()) } if (testing) { loggedEvents += eventJson http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 0e438ab..8235b10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[ org.apache.hadoop.mapreduce.InputFormat[_, _]] - val job = new Job(conf) + val job = Job.getInstance(conf) val retval = new ArrayBuffer[SplitInfo]() val list = instance.getSplits(job) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 0065b1f..acc24ca 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import java.io.File import java.util.PriorityQueue -import scala.util.{Failure, Success, Try} +import scala.util.Try import org.apache.hadoop.fs.FileSystem import org.apache.spark.Logging @@ -177,21 +177,8 @@ private [util] class SparkShutdownHookManager { val hookTask = new Runnable() { override def run(): Unit = runAll() } - Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match { - case Success(shmClass) => - val fsPriority = classOf[FileSystem] - .getField("SHUTDOWN_HOOK_PRIORITY") - .get(null) // static field, the value is not used - .asInstanceOf[Int] - val shm = shmClass.getMethod("get").invoke(null) - shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int]) - .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30)) - - case Failure(_) => - // scalastyle:off runtimeaddshutdownhook - Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook")); - // scalastyle:on runtimeaddshutdownhook - } + org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( + hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30) } def runAll(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 11f1248..d91948e 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1246,7 +1246,7 @@ public class JavaAPISuite implements Serializable { JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, - IntWritable.class, Text.class, new Job().getConfiguration()); + IntWritable.class, Text.class, Job.getInstance().getConfiguration()); Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { @Override public String call(Tuple2<IntWritable, Text> x) { http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/test/scala/org/apache/spark/FileSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index f6a7f43..2e47801 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -19,12 +19,11 @@ package org.apache.spark import java.io.{File, FileWriter} -import org.apache.spark.deploy.SparkHadoopUtil +import scala.io.Source + import org.apache.spark.input.PortableDataStream import org.apache.spark.storage.StorageLevel -import scala.io.Source - import org.apache.hadoop.io._ import org.apache.hadoop.io.compress.DefaultCodec import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat} @@ -506,11 +505,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) - val job = new Job(sc.hadoopConfiguration) + val job = Job.getInstance(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[String]) job.setOutputValueClass(classOf[String]) job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]]) - val jobConfig = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val jobConfig = job.getConfiguration jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new") randomRDD.saveAsNewAPIHadoopDataset(jobConfig) assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 5cb2d42..43da6fc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -67,11 +67,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) assert(fileSystem.exists(logPath)) val logStatus = fileSystem.getFileStatus(logPath) - assert(!logStatus.isDir) + assert(!logStatus.isDirectory) // Verify log is renamed after stop() eventLogger.stop() - assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDir) + assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDirectory) } test("Basic event logging") { http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 103fc19..761e82e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -23,7 +23,6 @@ import java.net.URI import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkConf, SparkContext, SPARK_VERSION} import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec @@ -115,7 +114,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter { val applications = fileSystem.listStatus(logDirPath) assert(applications != null && applications.size > 0) val eventLog = applications.sortBy(_.getModificationTime).last - assert(!eventLog.isDir) + assert(!eventLog.isDirectory) // Replay events val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index d1b9b8d..5a80985 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -16,7 +16,6 @@ */ // scalastyle:off println - // scalastyle:off jobcontext package org.apache.spark.examples import java.nio.ByteBuffer @@ -80,7 +79,7 @@ object CassandraCQLTest { val InputColumnFamily = "ordercf" val OutputColumnFamily = "salecount" - val job = new Job() + val job = Job.getInstance() job.setInputFormatClass(classOf[CqlPagingInputFormat]) val configuration = job.getConfiguration ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) @@ -137,4 +136,3 @@ object CassandraCQLTest { } } // scalastyle:on println -// scalastyle:on jobcontext http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala index 1e679bf..ad39a01 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala @@ -16,7 +16,6 @@ */ // scalastyle:off println -// scalastyle:off jobcontext package org.apache.spark.examples import java.nio.ByteBuffer @@ -59,7 +58,7 @@ object CassandraTest { val sc = new SparkContext(sparkConf) // Build the job configuration with ConfigHelper provided by Cassandra - val job = new Job() + val job = Job.getInstance() job.setInputFormatClass(classOf[ColumnFamilyInputFormat]) val host: String = args(1) @@ -131,7 +130,6 @@ object CassandraTest { } } // scalastyle:on println -// scalastyle:on jobcontext /* create keyspace casDemo; http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 59886ab..612ddf8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -49,6 +49,11 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory") ) ++ + Seq( + // SPARK-12481 + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "org.apache.spark.mapred.SparkHadoopMapRedUtil") + ) ++ // When 1.6 is officially released, update this exclusion list. Seq( MimaBuild.excludeSparkPackage("deploy"), http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/scalastyle-config.xml ---------------------------------------------------------------------- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 16d18b3..ee855ca 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -187,14 +187,6 @@ This file is divided into 3 sections: scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage> </check> - <!-- As of SPARK-10330 JobContext methods should not be called directly --> - <check customId="jobcontext" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true"> - <parameters><parameter name="regex">^getConfiguration$|^getTaskAttemptID$</parameter></parameters> - <customMessage>Instead of calling .getConfiguration() or .getTaskAttemptID() directly, - use SparkHadoopUtil's getConfigurationFromJobContext() and getTaskAttemptIDFromTaskAttemptContext() methods. - </customMessage> - </check> - <!-- ================================================================================ --> <!-- rules we'd like to enforce, but haven't cleaned up the codebase yet --> <!-- ================================================================================ --> http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 735d52f..758bcd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -93,7 +93,7 @@ private[sql] case class InsertIntoHadoopFsRelation( val isAppend = pathExists && (mode == SaveMode.Append) if (doInsertion) { - val job = new Job(hadoopConf) + val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, qualifiedOutputPath) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index eea780c..12f8783 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -26,10 +26,10 @@ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit} +import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.DataReadMethod -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.sql.{SQLConf, SQLContext} import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader import org.apache.spark.storage.StorageLevel @@ -68,16 +68,14 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( initLocalJobFuncOpt: Option[Job => Unit], inputFormatClass: Class[_ <: InputFormat[Void, V]], valueClass: Class[V]) - extends RDD[V](sqlContext.sparkContext, Nil) - with SparkHadoopMapReduceUtil - with Logging { + extends RDD[V](sqlContext.sparkContext, Nil) with Logging { protected def getJob(): Job = { - val conf: Configuration = broadcastedConf.value.value + val conf = broadcastedConf.value.value // "new Job" will make a copy of the conf. Then, it is // safe to mutate conf properties with initLocalJobFuncOpt // and initDriverSideJobFuncOpt. - val newJob = new Job(conf) + val newJob = Job.getInstance(conf) initLocalJobFuncOpt.map(f => f(newJob)) newJob } @@ -87,7 +85,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( if (isDriverSide) { initDriverSideJobFuncOpt.map(f => f(job)) } - SparkHadoopUtil.get.getConfigurationFromJobContext(job) + job.getConfiguration } private val jobTrackerId: String = { @@ -110,7 +108,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( configurable.setConf(conf) case _ => } - val jobContext = newJobContext(conf, jobId) + val jobContext = new JobContextImpl(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[SparkPartition](rawSplits.size) for (i <- 0 until rawSplits.size) { @@ -154,8 +152,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( configurable.setConf(conf) case _ => } - val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) - val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) + val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) private[this] var reader: RecordReader[Void, V] = null /** http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 983f4df..8b0b647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -24,10 +24,10 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.InternalRow @@ -41,14 +41,12 @@ private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, @transient private val job: Job, isAppend: Boolean) - extends SparkHadoopMapReduceUtil - with Logging - with Serializable { + extends Logging with Serializable { protected val dataSchema = relation.dataSchema protected val serializableConf = - new SerializableConfiguration(SparkHadoopUtil.get.getConfigurationFromJobContext(job)) + new SerializableConfiguration(job.getConfiguration) // This UUID is used to avoid output file name collision between different appending write jobs. // These jobs may belong to different SparkContext instances. Concrete data source implementations @@ -90,8 +88,7 @@ private[sql] abstract class BaseWriterContainer( // This UUID is sent to executor side together with the serialized `Configuration` object within // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate // unique task output files. - SparkHadoopUtil.get.getConfigurationFromJobContext(job). - set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) + job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, @@ -101,7 +98,7 @@ private[sql] abstract class BaseWriterContainer( // committer, since their initialization involve the job configuration, which can be potentially // decorated in `prepareJobForWrite`. outputWriterFactory = relation.prepareJobForWrite(job) - taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) + taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId) outputFormatClass = job.getOutputFormatClass outputCommitter = newOutputCommitter(taskAttemptContext) @@ -111,7 +108,7 @@ private[sql] abstract class BaseWriterContainer( def executorSideSetup(taskContext: TaskContext): Unit = { setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber()) setupConf() - taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) + taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId) outputCommitter = newOutputCommitter(taskAttemptContext) outputCommitter.setupTask(taskAttemptContext) } @@ -166,7 +163,7 @@ private[sql] abstract class BaseWriterContainer( "because spark.speculation is configured to be true.") defaultOutputCommitter } else { - val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context) + val configuration = context.getConfiguration val committerClass = configuration.getClass( SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) @@ -201,10 +198,8 @@ private[sql] abstract class BaseWriterContainer( private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = { this.jobId = SparkHadoopWriter.createJobID(new Date, jobId) - this.taskId = new TaskID(this.jobId, true, splitId) - // scalastyle:off jobcontext + this.taskId = new TaskID(this.jobId, TaskType.MAP, splitId) this.taskAttemptId = new TaskAttemptID(taskId, attemptId) - // scalastyle:on jobcontext } private def setupConf(): Unit = { @@ -250,7 +245,7 @@ private[sql] class DefaultWriterContainer( def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext) - val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) + val configuration = taskAttemptContext.getConfiguration configuration.set("spark.sql.sources.output.path", outputPath) val writer = newOutputWriter(getWorkPath) writer.initConverter(dataSchema) @@ -421,7 +416,7 @@ private[sql] class DynamicPartitionWriterContainer( def newOutputWriter(key: InternalRow): OutputWriter = { val partitionPath = getPartitionString(key).getString(0) val path = new Path(getWorkPath, partitionPath) - val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext) + val configuration = taskAttemptContext.getConfiguration configuration.set( "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) val newWriter = super.newOutputWriter(path.toString) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 3e61ba3..54a8552 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -30,8 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} import org.apache.spark.Logging import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeProjection @@ -89,8 +87,8 @@ private[sql] class JSONRelation( override val needConversion: Boolean = false private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = { - val job = new Job(sqlContext.sparkContext.hadoopConfiguration) - val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) + val conf = job.getConfiguration val paths = inputPaths.map(_.getPath) @@ -176,7 +174,7 @@ private[json] class JsonOutputWriter( path: String, dataSchema: StructType, context: TaskAttemptContext) - extends OutputWriter with SparkHadoopMapRedUtil with Logging { + extends OutputWriter with Logging { private[this] val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records @@ -186,9 +184,9 @@ private[json] class JsonOutputWriter( private val recordWriter: RecordWriter[NullWritable, Text] = { new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context) + val configuration = context.getConfiguration val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") - val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context) + val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") } http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index a958373..e5d8e60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -58,9 +58,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with */ override def init(context: InitContext): ReadContext = { catalystRequestedSchema = { - // scalastyle:off jobcontext val conf = context.getConfiguration - // scalastyle:on jobcontext val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString) http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala index 1a4e99f..e54f51e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala @@ -54,11 +54,7 @@ private[datasources] class DirectParquetOutputCommitter( override def setupTask(taskContext: TaskAttemptContext): Unit = {} override def commitJob(jobContext: JobContext) { - val configuration = { - // scalastyle:off jobcontext - ContextUtil.getConfiguration(jobContext) - // scalastyle:on jobcontext - } + val configuration = ContextUtil.getConfiguration(jobContext) val fileSystem = outputPath.getFileSystem(configuration) if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) { http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 1af2a39..af964b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.metadata.CompressionCodecName @@ -40,7 +41,6 @@ import org.apache.parquet.{Log => ApacheParquetLog} import org.slf4j.bridge.SLF4JBridgeHandler import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -82,9 +82,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all // partitions in the case of dynamic partitioning. override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context) + val configuration = context.getConfiguration val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") - val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context) + val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") } @@ -217,11 +217,7 @@ private[sql] class ParquetRelation( override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum override def prepareJobForWrite(job: Job): OutputWriterFactory = { - val conf = { - // scalastyle:off jobcontext - ContextUtil.getConfiguration(job) - // scalastyle:on jobcontext - } + val conf = ContextUtil.getConfiguration(job) // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) @@ -340,7 +336,7 @@ private[sql] class ParquetRelation( // URI of the path to create a new Path. val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) new FileStatus( - f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, + f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) }.toSeq @@ -359,7 +355,7 @@ private[sql] class ParquetRelation( } } - val jobContext = newJobContext(getConf(isDriverSide = true), jobId) + val jobContext = new JobContextImpl(getConf(isDriverSide = true), jobId) val rawSplits = inputFormat.getSplits(jobContext) Array.tabulate[SparkPartition](rawSplits.size) { i => @@ -564,7 +560,7 @@ private[sql] object ParquetRelation extends Logging { parquetFilterPushDown: Boolean, assumeBinaryIsString: Boolean, assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { - val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val conf = job.getConfiguration conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) // Try to push down filters when filter push-down is enabled. @@ -607,7 +603,7 @@ private[sql] object ParquetRelation extends Logging { FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) } - overrideMinSplitSize(parquetBlockSize, SparkHadoopUtil.get.getConfigurationFromJobContext(job)) + overrideMinSplitSize(parquetBlockSize, job.getConfiguration) } private[parquet] def readSchema( http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 41fcb11..248467a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -88,8 +86,8 @@ private[sql] class TextRelation( filters: Array[Filter], inputPaths: Array[FileStatus], broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { - val job = new Job(sqlContext.sparkContext.hadoopConfiguration) - val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) + val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) + val conf = job.getConfiguration val paths = inputPaths.map(_.getPath).sortBy(_.toUri) if (paths.nonEmpty) { @@ -138,17 +136,16 @@ private[sql] class TextRelation( } class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext) - extends OutputWriter - with SparkHadoopMapRedUtil { + extends OutputWriter { private[this] val buffer = new Text() private val recordWriter: RecordWriter[NullWritable, Text] = { new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context) + val configuration = context.getConfiguration val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") - val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context) + val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension") } http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index fc8ce69..d6c5d14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -462,7 +462,7 @@ abstract class HadoopFsRelation private[sql]( name.toLowerCase == "_temporary" || name.startsWith(".") } - val (dirs, files) = statuses.partition(_.isDir) + val (dirs, files) = statuses.partition(_.isDirectory) // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) if (dirs.isEmpty) { @@ -858,10 +858,10 @@ private[sql] object HadoopFsRelation extends Logging { val jobConf = new JobConf(fs.getConf, this.getClass()) val pathFilter = FileInputFormat.getInputPathFilter(jobConf) if (pathFilter != null) { - val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir) + val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory) files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) } else { - val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) + val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory) files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) } } @@ -896,7 +896,7 @@ private[sql] object HadoopFsRelation extends Logging { FakeFileStatus( status.getPath.toString, status.getLen, - status.isDir, + status.isDirectory, status.getReplication, status.getBlockSize, status.getModificationTime, http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 384ea21..5d00e73 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -380,7 +380,7 @@ class HiveContext private[hive]( def calculateTableSize(fs: FileSystem, path: Path): Long = { val fileStatus = fs.getFileStatus(path) - val size = if (fileStatus.isDir) { + val size = if (fileStatus.isDirectory) { fs.listStatus(path) .map { status => if (!status.getPath().getName().startsWith(stagingDir)) { http://git-wip-us.apache.org/repos/asf/spark/blob/15bd7362/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 598ccde..d3da22a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -31,9 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.{Driver, metadata} -import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.util.VersionInfo import org.apache.spark.{SparkConf, SparkException, Logging} import org.apache.spark.sql.catalyst.expressions.Expression @@ -65,74 +63,6 @@ private[hive] class ClientWrapper( extends ClientInterface with Logging { - overrideHadoopShims() - - // !! HACK ALERT !! - // - // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking - // major version number gathered from Hadoop jar files: - // - // - For major version number 1, load `Hadoop20SShims`, where "20S" stands for Hadoop 0.20 with - // security. - // - For major version number 2, load `Hadoop23Shims`, where "23" stands for Hadoop 0.23. - // - // However, APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical reasons. It - // turns out that Hadoop 2.0.x versions should also be used together with `Hadoop20SShims`, but - // `Hadoop23Shims` is chosen because the major version number here is 2. - // - // To fix this issue, we try to inspect Hadoop version via `org.apache.hadoop.utils.VersionInfo` - // and load `Hadoop20SShims` for Hadoop 1.x and 2.0.x versions. If Hadoop version information is - // not available, we decide whether to override the shims or not by checking for existence of a - // probe method which doesn't exist in Hadoop 1.x or 2.0.x versions. - private def overrideHadoopShims(): Unit = { - val hadoopVersion = VersionInfo.getVersion - val VersionPattern = """(\d+)\.(\d+).*""".r - - hadoopVersion match { - case null => - logError("Failed to inspect Hadoop version") - - // Using "Path.getPathWithoutSchemeAndAuthority" as the probe method. - val probeMethod = "getPathWithoutSchemeAndAuthority" - if (!classOf[Path].getDeclaredMethods.exists(_.getName == probeMethod)) { - logInfo( - s"Method ${classOf[Path].getCanonicalName}.$probeMethod not found, " + - s"we are probably using Hadoop 1.x or 2.0.x") - loadHadoop20SShims() - } - - case VersionPattern(majorVersion, minorVersion) => - logInfo(s"Inspected Hadoop version: $hadoopVersion") - - // Loads Hadoop20SShims for 1.x and 2.0.x versions - val (major, minor) = (majorVersion.toInt, minorVersion.toInt) - if (major < 2 || (major == 2 && minor == 0)) { - loadHadoop20SShims() - } - } - - // Logs the actual loaded Hadoop shims class - val loadedShimsClassName = ShimLoader.getHadoopShims.getClass.getCanonicalName - logInfo(s"Loaded $loadedShimsClassName for Hadoop version $hadoopVersion") - } - - private def loadHadoop20SShims(): Unit = { - val hadoop20SShimsClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims" - logInfo(s"Loading Hadoop shims $hadoop20SShimsClassName") - - try { - val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims") - // scalastyle:off classforname - val shimsClass = Class.forName(hadoop20SShimsClassName) - // scalastyle:on classforname - val shims = classOf[HadoopShims].cast(shimsClass.newInstance()) - shimsField.setAccessible(true) - shimsField.set(null, shims) - } catch { case cause: Throwable => - throw new RuntimeException(s"Failed to load $hadoop20SShimsClassName", cause) - } - } - // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org