Repository: spark
Updated Branches:
  refs/heads/master 3c66ff727 -> 9307f5653


[SPARK-9472] [STREAMING] consistent hadoop configuration, streaming only

Author: cody koeninger <[email protected]>

Closes #7772 from koeninger/streaming-hadoop-config and squashes the following 
commits:

5267284 [cody koeninger] [SPARK-4229][Streaming] consistent hadoop 
configuration, streaming only


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9307f565
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9307f565
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9307f565

Branch: refs/heads/master
Commit: 9307f5653d19a6a2fda355a675ca9ea97e35611b
Parents: 3c66ff7
Author: cody koeninger <[email protected]>
Authored: Thu Jul 30 17:44:20 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Thu Jul 30 17:44:20 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala    | 3 ++-
 .../scala/org/apache/spark/streaming/StreamingContext.scala   | 7 ++++---
 .../org/apache/spark/streaming/api/java/JavaPairDStream.scala | 2 +-
 .../spark/streaming/api/java/JavaStreamingContext.scala       | 3 ++-
 4 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9307f565/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 65d4e93..2780d5b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkException, SparkConf, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.{MetadataCleaner, Utils}
 import org.apache.spark.streaming.scheduler.JobGenerator
@@ -100,7 +101,7 @@ object Checkpoint extends Logging {
     }
 
     val path = new Path(checkpointDir)
-    val fs = fsOption.getOrElse(path.getFileSystem(new Configuration()))
+    val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
     if (fs.exists(path)) {
       val statuses = fs.listStatus(path)
       if (statuses != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/9307f565/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 92438f1..177e710 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => 
NewInputFormat}
 
 import org.apache.spark._
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.input.FixedLengthBinaryInputFormat
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.serializer.SerializationDebugger
@@ -110,7 +111,7 @@ class StreamingContext private[streaming] (
    * Recreate a StreamingContext from a checkpoint file.
    * @param path Path to the directory that was specified as the checkpoint 
directory
    */
-  def this(path: String) = this(path, new Configuration)
+  def this(path: String) = this(path, SparkHadoopUtil.get.conf)
 
   /**
    * Recreate a StreamingContext from a checkpoint file using an existing 
SparkContext.
@@ -803,7 +804,7 @@ object StreamingContext extends Logging {
   def getActiveOrCreate(
       checkpointPath: String,
       creatingFunc: () => StreamingContext,
-      hadoopConf: Configuration = new Configuration(),
+      hadoopConf: Configuration = SparkHadoopUtil.get.conf,
       createOnError: Boolean = false
     ): StreamingContext = {
     ACTIVATION_LOCK.synchronized {
@@ -828,7 +829,7 @@ object StreamingContext extends Logging {
   def getOrCreate(
       checkpointPath: String,
       creatingFunc: () => StreamingContext,
-      hadoopConf: Configuration = new Configuration(),
+      hadoopConf: Configuration = SparkHadoopUtil.get.conf,
       createOnError: Boolean = false
     ): StreamingContext = {
     val checkpointOption = CheckpointReader.read(

http://git-wip-us.apache.org/repos/asf/spark/blob/9307f565/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 959ac9c..26383e4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -788,7 +788,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[F],
-      conf: Configuration = new Configuration) {
+      conf: Configuration = dstream.context.sparkContext.hadoopConfiguration) {
     dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, 
outputFormatClass, conf)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9307f565/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 40deb6d..35cc3ce 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -33,6 +33,7 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
 import org.apache.spark.api.java.function.{Function => JFunction, Function2 => 
JFunction2}
 import org.apache.spark.api.java.function.{Function0 => JFunction0}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
@@ -136,7 +137,7 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
    * Recreate a JavaStreamingContext from a checkpoint file.
    * @param path Path to the directory that was specified as the checkpoint 
directory
    */
-  def this(path: String) = this(new StreamingContext(path, new Configuration))
+  def this(path: String) = this(new StreamingContext(path, 
SparkHadoopUtil.get.conf))
 
   /**
    * Re-creates a JavaStreamingContext from a checkpoint file.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to