Repository: spark
Updated Branches:
refs/heads/master 2e35e2429 -> 26d31d15f
Revert "SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce}Util should not use
package org.apache.hadoop"
This reverts commit 68cb69daf3022e973422e496ccf827ca3806ff30.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26d31d15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26d31d15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26d31d15
Branch: refs/heads/master
Commit: 26d31d15fda3f63707a28d1a1115770ad127cf8f
Parents: 2e35e24
Author: Andrew Or <[email protected]>
Authored: Thu Oct 30 17:56:10 2014 -0700
Committer: Andrew Or <[email protected]>
Committed: Thu Oct 30 17:56:10 2014 -0700
----------------------------------------------------------------------
.../hadoop/mapred/SparkHadoopMapRedUtil.scala | 54 +++++++++++++
.../mapreduce/SparkHadoopMapReduceUtil.scala | 79 +++++++++++++++++++
.../org/apache/spark/SparkHadoopWriter.scala | 1 -
.../spark/mapred/SparkHadoopMapRedUtil.scala | 56 --------------
.../mapreduce/SparkHadoopMapReduceUtil.scala | 80 --------------------
.../org/apache/spark/rdd/NewHadoopRDD.scala | 1 -
.../org/apache/spark/rdd/PairRDDFunctions.scala | 3 +-
project/MimaExcludes.scala | 8 --
.../sql/parquet/ParquetTableOperations.scala | 1 -
.../spark/sql/hive/hiveWriterContainers.scala | 1 -
10 files changed, 134 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
new file mode 100644
index 0000000..0c47afa
--- /dev/null
+++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred
+
+private[apache]
+trait SparkHadoopMapRedUtil {
+ def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
+ "org.apache.hadoop.mapred.JobContext")
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf],
+ classOf[org.apache.hadoop.mapreduce.JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
+
+ def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID):
TaskAttemptContext = {
+ val klass =
firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
+ "org.apache.hadoop.mapred.TaskAttemptContext")
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf],
classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
+
+ def newTaskAttemptID(
+ jtIdentifier: String,
+ jobId: Int,
+ isMap: Boolean,
+ taskId: Int,
+ attemptId: Int) = {
+ new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
new file mode 100644
index 0000000..1fca572
--- /dev/null
+++
b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce
+
+import java.lang.{Boolean => JBoolean, Integer => JInteger}
+
+import org.apache.hadoop.conf.Configuration
+
+private[apache]
+trait SparkHadoopMapReduceUtil {
+ def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2,
hadoop2-yarn
+ "org.apache.hadoop.mapreduce.JobContext") // hadoop1
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration],
classOf[JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
+
+ def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID):
TaskAttemptContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", //
hadoop2, hadoop2-yarn
+ "org.apache.hadoop.mapreduce.TaskAttemptContext") // hadoop1
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration],
classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
+
+ def newTaskAttemptID(
+ jtIdentifier: String,
+ jobId: Int,
+ isMap: Boolean,
+ taskId: Int,
+ attemptId: Int) = {
+ val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID")
+ try {
+ // First, attempt to use the old-style constructor that takes a boolean
isMap
+ // (not available in YARN)
+ val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int],
classOf[Boolean],
+ classOf[Int], classOf[Int])
+ ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap),
new JInteger(taskId),
+ new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+ } catch {
+ case exc: NoSuchMethodException => {
+ // If that failed, look for the new constructor that takes a TaskType
(not available in 1.x)
+ val taskTypeClass =
Class.forName("org.apache.hadoop.mapreduce.TaskType")
+ .asInstanceOf[Class[Enum[_]]]
+ val taskType = taskTypeClass.getMethod("valueOf",
classOf[String]).invoke(
+ taskTypeClass, if(isMap) "MAP" else "REDUCE")
+ val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int],
taskTypeClass,
+ classOf[Int], classOf[Int])
+ ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new
JInteger(taskId),
+ new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+ }
+ }
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 4023759..376e69c 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
deleted file mode 100644
index fe2bc65..0000000
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mapred
-
-import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext,
TaskAttemptContext}
-
-private[spark]
-trait SparkHadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
- val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
- "org.apache.hadoop.mapred.JobContext")
- val ctor = klass.getDeclaredConstructor(classOf[JobConf],
- classOf[org.apache.hadoop.mapreduce.JobID])
- ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
- }
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID):
TaskAttemptContext = {
- val klass =
firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
- "org.apache.hadoop.mapred.TaskAttemptContext")
- val ctor = klass.getDeclaredConstructor(classOf[JobConf],
classOf[TaskAttemptID])
- ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
- }
-
- def newTaskAttemptID(
- jtIdentifier: String,
- jobId: Int,
- isMap: Boolean,
- taskId: Int,
- attemptId: Int) = {
- new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
- }
-
- private def firstAvailableClass(first: String, second: String): Class[_] = {
- try {
- Class.forName(first)
- } catch {
- case e: ClassNotFoundException =>
- Class.forName(second)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
deleted file mode 100644
index 3340673..0000000
---
a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mapreduce
-
-import java.lang.{Boolean => JBoolean, Integer => JInteger}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext,
TaskAttemptID}
-
-private[spark]
-trait SparkHadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
- val klass = firstAvailableClass(
- "org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2,
hadoop2-yarn
- "org.apache.hadoop.mapreduce.JobContext") // hadoop1
- val ctor = klass.getDeclaredConstructor(classOf[Configuration],
classOf[JobID])
- ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
- }
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID):
TaskAttemptContext = {
- val klass = firstAvailableClass(
- "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", //
hadoop2, hadoop2-yarn
- "org.apache.hadoop.mapreduce.TaskAttemptContext") // hadoop1
- val ctor = klass.getDeclaredConstructor(classOf[Configuration],
classOf[TaskAttemptID])
- ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
- }
-
- def newTaskAttemptID(
- jtIdentifier: String,
- jobId: Int,
- isMap: Boolean,
- taskId: Int,
- attemptId: Int) = {
- val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID")
- try {
- // First, attempt to use the old-style constructor that takes a boolean
isMap
- // (not available in YARN)
- val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int],
classOf[Boolean],
- classOf[Int], classOf[Int])
- ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap),
new JInteger(taskId),
- new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
- } catch {
- case exc: NoSuchMethodException => {
- // If that failed, look for the new constructor that takes a TaskType
(not available in 1.x)
- val taskTypeClass =
Class.forName("org.apache.hadoop.mapreduce.TaskType")
- .asInstanceOf[Class[Enum[_]]]
- val taskType = taskTypeClass.getMethod("valueOf",
classOf[String]).invoke(
- taskTypeClass, if(isMap) "MAP" else "REDUCE")
- val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int],
taskTypeClass,
- classOf[Int], classOf[Int])
- ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new
JInteger(taskId),
- new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
- }
- }
- }
-
- private def firstAvailableClass(first: String, second: String): Class[_] = {
- try {
- Class.forName(first)
- } catch {
- case e: ClassNotFoundException =>
- Class.forName(second)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index e7b1170..3245632 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -35,7 +35,6 @@ import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 462f0d6..da89f63 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -33,14 +33,13 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat,
JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat =>
NewOutputFormat,
-RecordWriter => NewRecordWriter}
+RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a94d09b..6a0495f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -77,14 +77,6 @@ object MimaExcludes {
// SPARK-3822
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
- ) ++ Seq(
- // SPARK-1209
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
- ProblemFilters.exclude[MissingTypesProblem](
- "org.apache.spark.rdd.PairRDDFunctions")
)
case v if v.startsWith("1.1") =>
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 416bf56..9664c56 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -43,7 +43,6 @@ import parquet.hadoop.util.ContextUtil
import parquet.io.ParquetDecodingException
import parquet.schema.MessageType
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.SQLConf
http://git-wip-us.apache.org/repos/asf/spark/blob/26d31d15/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index cc8bb3e..bf2ce9d 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]