Repository: spark
Updated Branches:
  refs/heads/master dc126f212 -> 5140598df


SPARK-1128: set hadoop task properties when constructing HadoopRDD

https://spark-project.atlassian.net/browse/SPARK-1128

The task properties are not set when constructing HadoopRDD in current 
implementation, this may limit the implementation based on

```
mapred.tip.id
mapred.task.id
mapred.task.is.map
mapred.task.partition
mapred.job.id
```

This patch also contains a small fix  in createJobID (SparkHadoopWriter.scala), 
where the current implementation actually is not using time parameter

Author: CodingCat <[email protected]>
Author: Nan Zhu <[email protected]>

Closes #101 from CodingCat/SPARK-1128 and squashes the following commits:

ed0980f [CodingCat] make SparkHiveHadoopWriter belongs to spark package
5b1ad7d [CodingCat] move SparkHiveHadoopWriter to org.apache.spark package
258f92c [CodingCat] code cleanup
af88939 [CodingCat] update the comments and permission of SparkHadoopWriter
9bd1fe3 [CodingCat] move configuration for jobConf to HadoopRDD
b7bdfa5 [Nan Zhu] style fix
a3153a8 [Nan Zhu] style fix
c3258d2 [CodingCat] set hadoop task properties while using InputFormat


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5140598d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5140598d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5140598d

Branch: refs/heads/master
Commit: 5140598df889f7227c9d6a7953031eeef524badd
Parents: dc126f2
Author: CodingCat <[email protected]>
Authored: Mon Mar 24 21:55:03 2014 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Mon Mar 24 21:55:03 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    |  29 +--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  25 ++-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   7 +-
 .../hadoop/mapred/SparkHadoopWriter.scala       | 197 -------------------
 .../org/apache/spark/SparkHadoopWriter.scala    | 195 ++++++++++++++++++
 .../apache/spark/sql/hive/hiveOperators.scala   |   2 +-
 6 files changed, 233 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5140598d/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala 
b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index d404459..b92ea01 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -15,28 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred
+package org.apache.spark
 
 import java.io.IOException
 import java.text.NumberFormat
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import org.apache.hadoop.mapred._
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.Logging
-import org.apache.spark.SerializableWritable
+import org.apache.spark.rdd.HadoopRDD
 
 /**
- * Internal helper class that saves an RDD using a Hadoop OutputFormat. This 
is only public
- * because we need to access this class from the `spark` package to use some 
package-private Hadoop
- * functions, but this class should not be used directly by users.
+ * Internal helper class that saves an RDD using a Hadoop OutputFormat.
  *
  * Saves the RDD using a JobConf, which should contain an output key class, an 
output value class,
  * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
  */
-private[apache]
+private[spark]
 class SparkHadoopWriter(@transient jobConf: JobConf)
   extends Logging
   with SparkHadoopMapRedUtil
@@ -59,7 +57,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 
   def preSetup() {
     setIDs(0, 0, 0)
-    setConfParams()
+    HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
     
     val jCtxt = getJobContext() 
     getOutputCommitter().setupJob(jCtxt)
@@ -68,7 +66,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 
   def setup(jobid: Int, splitid: Int, attemptid: Int) {
     setIDs(jobid, splitid, attemptid)
-    setConfParams() 
+    HadoopRDD.addLocalConfiguration(new 
SimpleDateFormat("yyyyMMddHHmm").format(now),
+      jobid, splitID, attemptID, conf.value)
   }
 
   def open() {
@@ -167,21 +166,13 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
     taID = new SerializableWritable[TaskAttemptID](
         new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
   }
-
-  private def setConfParams() {
-    conf.value.set("mapred.job.id", jID.value.toString)
-    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
-    conf.value.set("mapred.task.id", taID.value.toString)
-    conf.value.setBoolean("mapred.task.is.map", true)
-    conf.value.setInt("mapred.task.partition", splitID)
-  }
 }
 
-private[apache]
+private[spark]
 object SparkHadoopWriter {
   def createJobID(time: Date, id: Int): JobID = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    val jobtrackerID = formatter.format(new Date())
+    val jobtrackerID = formatter.format(time)
     new JobID(jobtrackerID, id)
   }
   

http://git-wip-us.apache.org/repos/asf/spark/blob/5140598d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 100ddb3..932ff5b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.rdd
 
+import java.text.SimpleDateFormat
+import java.util.Date
 import java.io.EOFException
 import scala.collection.immutable.Map
 
@@ -27,6 +29,9 @@ import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
+import org.apache.hadoop.mapred.JobID
+import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.hadoop.mapred.TaskID
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
@@ -111,6 +116,9 @@ class HadoopRDD[K, V](
 
   protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
 
+  // used to build JobTracker ID
+  private val createTime = new Date()
+
   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
@@ -165,12 +173,14 @@ class HadoopRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext) = {
     val iter = new NextIterator[(K, V)] {
+
       val split = theSplit.asInstanceOf[HadoopPartition]
       logInfo("Input split: " + split.inputSplit)
       var reader: RecordReader[K, V] = null
-
       val jobConf = getJobConf()
       val inputFormat = getInputFormat(jobConf)
+      HadoopRDD.addLocalConfiguration(new 
SimpleDateFormat("yyyyMMddHHmm").format(createTime),
+        context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
       reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
 
       // Register an on-task-completion callback to close the input stream.
@@ -222,4 +232,17 @@ private[spark] object HadoopRDD {
 
   def putCachedMetadata(key: String, value: Any) =
     SparkEnv.get.hadoopJobMetadata.put(key, value)
+
+  /** Add Hadoop configuration specific to a single partition and attempt. */
+  def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
attemptId: Int,
+                            conf: JobConf) {
+    val jobID = new JobID(jobTrackerId, jobId)
+    val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId)
+
+    conf.set("mapred.tip.id", taId.getTaskID.toString)
+    conf.set("mapred.task.id", taId.toString)
+    conf.setBoolean("mapred.task.is.map", true)
+    conf.setInt("mapred.task.partition", splitId)
+    conf.set("mapred.job.id", jobID.toString)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5140598d/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 75fc02a..14386ff 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -34,14 +34,13 @@ import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, 
JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => 
NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => 
NewAPIHadoopJob,
+RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => 
NewFileOutputFormat}
 
-// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files 
defined in Spark.
-import org.apache.hadoop.mapred.SparkHadoopWriter
-
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.SparkHadoopWriter
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}

http://git-wip-us.apache.org/repos/asf/spark/blob/5140598d/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala 
b/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala
deleted file mode 100644
index 0b38731..0000000
--- a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-import java.io.IOException
-import java.text.NumberFormat
-import java.util.Date
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc
-import org.apache.hadoop.io.Writable
-
-import org.apache.spark.Logging
-import org.apache.spark.SerializableWritable
-
-/**
- * Internal helper class that saves an RDD using a Hive OutputFormat.
- * It is based on [[SparkHadoopWriter]].
- */
-protected[apache]
-class SparkHiveHadoopWriter(
-    @transient jobConf: JobConf,
-    fileSinkConf: FileSinkDesc)
-  extends Logging
-  with SparkHadoopMapRedUtil
-  with Serializable {
-
-  private val now = new Date()
-  private val conf = new SerializableWritable(jobConf)
-
-  private var jobID = 0
-  private var splitID = 0
-  private var attemptID = 0
-  private var jID: SerializableWritable[JobID] = null
-  private var taID: SerializableWritable[TaskAttemptID] = null
-
-  @transient private var writer: FileSinkOperator.RecordWriter = null
-  @transient private var format: HiveOutputFormat[AnyRef, Writable] = null
-  @transient private var committer: OutputCommitter = null
-  @transient private var jobContext: JobContext = null
-  @transient private var taskContext: TaskAttemptContext = null
-
-  def preSetup() {
-    setIDs(0, 0, 0)
-    setConfParams()
-
-    val jCtxt = getJobContext()
-    getOutputCommitter().setupJob(jCtxt)
-  }
-
-
-  def setup(jobid: Int, splitid: Int, attemptid: Int) {
-    setIDs(jobid, splitid, attemptid)
-    setConfParams()
-  }
-
-  def open() {
-    val numfmt = NumberFormat.getInstance()
-    numfmt.setMinimumIntegerDigits(5)
-    numfmt.setGroupingUsed(false)
-
-    val extension = Utilities.getFileExtension(
-      conf.value,
-      fileSinkConf.getCompressed,
-      getOutputFormat())
-
-    val outputName = "part-"  + numfmt.format(splitID) + extension
-    val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName)
-
-    getOutputCommitter().setupTask(getTaskContext())
-    writer = HiveFileFormatUtils.getHiveRecordWriter(
-      conf.value,
-      fileSinkConf.getTableInfo,
-      conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
-      fileSinkConf,
-      path,
-      null)
-  }
-
-  def write(value: Writable) {
-    if (writer != null) {
-      writer.write(value)
-    } else {
-      throw new IOException("Writer is null, open() has not been called")
-    }
-  }
-
-  def close() {
-    // Seems the boolean value passed into close does not matter.
-    writer.close(false)
-  }
-
-  def commit() {
-    val taCtxt = getTaskContext()
-    val cmtr = getOutputCommitter()
-    if (cmtr.needsTaskCommit(taCtxt)) {
-      try {
-        cmtr.commitTask(taCtxt)
-        logInfo (taID + ": Committed")
-      } catch {
-        case e: IOException => {
-          logError("Error committing the output of task: " + taID.value, e)
-          cmtr.abortTask(taCtxt)
-          throw e
-        }
-      }
-    } else {
-      logWarning ("No need to commit output of task: " + taID.value)
-    }
-  }
-
-  def commitJob() {
-    // always ? Or if cmtr.needsTaskCommit ?
-    val cmtr = getOutputCommitter()
-    cmtr.commitJob(getJobContext())
-  }
-
-  // ********* Private Functions *********
-
-  private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = {
-    if (format == null) {
-      format = conf.value.getOutputFormat()
-        .asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
-    }
-    format
-  }
-
-  private def getOutputCommitter(): OutputCommitter = {
-    if (committer == null) {
-      committer = conf.value.getOutputCommitter
-    }
-    committer
-  }
-
-  private def getJobContext(): JobContext = {
-    if (jobContext == null) {
-      jobContext = newJobContext(conf.value, jID.value)
-    }
-    jobContext
-  }
-
-  private def getTaskContext(): TaskAttemptContext = {
-    if (taskContext == null) {
-      taskContext =  newTaskAttemptContext(conf.value, taID.value)
-    }
-    taskContext
-  }
-
-  private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
-    jobID = jobid
-    splitID = splitid
-    attemptID = attemptid
-
-    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, 
jobid))
-    taID = new SerializableWritable[TaskAttemptID](
-      new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
-  }
-
-  private def setConfParams() {
-    conf.value.set("mapred.job.id", jID.value.toString)
-    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
-    conf.value.set("mapred.task.id", taID.value.toString)
-    conf.value.setBoolean("mapred.task.is.map", true)
-    conf.value.setInt("mapred.task.partition", splitID)
-  }
-}
-
-object SparkHiveHadoopWriter {
-  def createPathFromString(path: String, conf: JobConf): Path = {
-    if (path == null) {
-      throw new IllegalArgumentException("Output path is null")
-    }
-    val outputPath = new Path(path)
-    val fs = outputPath.getFileSystem(conf)
-    if (outputPath == null || fs == null) {
-      throw new IllegalArgumentException("Incorrectly formatted output path")
-    }
-    outputPath.makeQualified(fs)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5140598d/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala 
b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
new file mode 100644
index 0000000..d96c2f7
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.IOException
+import java.text.NumberFormat
+import java.util.Date
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.io.Writable
+
+/**
+ * Internal helper class that saves an RDD using a Hive OutputFormat.
+ * It is based on [[SparkHadoopWriter]].
+ */
+protected[spark]
+class SparkHiveHadoopWriter(
+    @transient jobConf: JobConf,
+    fileSinkConf: FileSinkDesc)
+  extends Logging
+  with SparkHadoopMapRedUtil
+  with Serializable {
+
+  private val now = new Date()
+  private val conf = new SerializableWritable(jobConf)
+
+  private var jobID = 0
+  private var splitID = 0
+  private var attemptID = 0
+  private var jID: SerializableWritable[JobID] = null
+  private var taID: SerializableWritable[TaskAttemptID] = null
+
+  @transient private var writer: FileSinkOperator.RecordWriter = null
+  @transient private var format: HiveOutputFormat[AnyRef, Writable] = null
+  @transient private var committer: OutputCommitter = null
+  @transient private var jobContext: JobContext = null
+  @transient private var taskContext: TaskAttemptContext = null
+
+  def preSetup() {
+    setIDs(0, 0, 0)
+    setConfParams()
+
+    val jCtxt = getJobContext()
+    getOutputCommitter().setupJob(jCtxt)
+  }
+
+
+  def setup(jobid: Int, splitid: Int, attemptid: Int) {
+    setIDs(jobid, splitid, attemptid)
+    setConfParams()
+  }
+
+  def open() {
+    val numfmt = NumberFormat.getInstance()
+    numfmt.setMinimumIntegerDigits(5)
+    numfmt.setGroupingUsed(false)
+
+    val extension = Utilities.getFileExtension(
+      conf.value,
+      fileSinkConf.getCompressed,
+      getOutputFormat())
+
+    val outputName = "part-"  + numfmt.format(splitID) + extension
+    val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName)
+
+    getOutputCommitter().setupTask(getTaskContext())
+    writer = HiveFileFormatUtils.getHiveRecordWriter(
+      conf.value,
+      fileSinkConf.getTableInfo,
+      conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
+      fileSinkConf,
+      path,
+      null)
+  }
+
+  def write(value: Writable) {
+    if (writer != null) {
+      writer.write(value)
+    } else {
+      throw new IOException("Writer is null, open() has not been called")
+    }
+  }
+
+  def close() {
+    // Seems the boolean value passed into close does not matter.
+    writer.close(false)
+  }
+
+  def commit() {
+    val taCtxt = getTaskContext()
+    val cmtr = getOutputCommitter()
+    if (cmtr.needsTaskCommit(taCtxt)) {
+      try {
+        cmtr.commitTask(taCtxt)
+        logInfo (taID + ": Committed")
+      } catch {
+        case e: IOException => {
+          logError("Error committing the output of task: " + taID.value, e)
+          cmtr.abortTask(taCtxt)
+          throw e
+        }
+      }
+    } else {
+      logWarning ("No need to commit output of task: " + taID.value)
+    }
+  }
+
+  def commitJob() {
+    // always ? Or if cmtr.needsTaskCommit ?
+    val cmtr = getOutputCommitter()
+    cmtr.commitJob(getJobContext())
+  }
+
+  // ********* Private Functions *********
+
+  private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = {
+    if (format == null) {
+      format = conf.value.getOutputFormat()
+        .asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
+    }
+    format
+  }
+
+  private def getOutputCommitter(): OutputCommitter = {
+    if (committer == null) {
+      committer = conf.value.getOutputCommitter
+    }
+    committer
+  }
+
+  private def getJobContext(): JobContext = {
+    if (jobContext == null) {
+      jobContext = newJobContext(conf.value, jID.value)
+    }
+    jobContext
+  }
+
+  private def getTaskContext(): TaskAttemptContext = {
+    if (taskContext == null) {
+      taskContext =  newTaskAttemptContext(conf.value, taID.value)
+    }
+    taskContext
+  }
+
+  private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
+    jobID = jobid
+    splitID = splitid
+    attemptID = attemptid
+
+    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, 
jobid))
+    taID = new SerializableWritable[TaskAttemptID](
+      new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+  }
+
+  private def setConfParams() {
+    conf.value.set("mapred.job.id", jID.value.toString)
+    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
+    conf.value.set("mapred.task.id", taID.value.toString)
+    conf.value.setBoolean("mapred.task.is.map", true)
+    conf.value.setInt("mapred.task.partition", splitID)
+  }
+}
+
+object SparkHiveHadoopWriter {
+  def createPathFromString(path: String, conf: JobConf): Path = {
+    if (path == null) {
+      throw new IllegalArgumentException("Output path is null")
+    }
+    val outputPath = new Path(path)
+    val fs = outputPath.getFileSystem(conf)
+    if (outputPath == null || fs == null) {
+      throw new IllegalArgumentException("Incorrectly formatted output path")
+    }
+    outputPath.makeQualified(fs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5140598d/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
index 9aa9e17..78f69e7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
@@ -35,7 +35,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
 import org.apache.spark.sql.execution._
-import org.apache.spark.{TaskContext, SparkException}
+import org.apache.spark.{SparkHiveHadoopWriter, TaskContext, SparkException}
 
 /* Implicits */
 import scala.collection.JavaConversions._

Reply via email to