3 quick questions, then some background:

1.  Is there a reason not to document the fact that spark.hadoop.* is
copied from spark config into hadoop config?

2.  Is there a reason StreamingContext.getOrCreate defaults to a blank
hadoop configuration rather than
org.apache.spark.deploy.SparkHadoopUtil.get.conf,
which would pull values from spark config?

3.  If I submit a PR to address those issues, is it likely to be lost in
the 1.2 scramble :)


Background:

I have a streaming job that is not recoverable from checkpoint, because the
s3 credentials were originally set using
sparkContext.hadoopConfiguration.set.

Checkpointing saves the spark config, but not the transient spark context,
so does not save the s3 credentials unless they were originally present in
the spark config.

Providing a hadoop config to getOrCreate only uses that hadoop config for
CheckpointReader's initial load of the checkpoint file.  It does not copy
the hadoop config into the newly created spark context, and so the
immediately following attempt to restore DStreamCheckpointData fails for
lack of credentials.

I think the cleanest way to handle this would be to encourage people to set
hadoop configuration in the spark config, and for
StreamingContext.getOrCreate to use SparkHadoopUtil rather than a blank
config.


Relevant stack trace:

14/11/04 15:37:30 INFO CheckpointReader: Checkpoint files found: s3n://XXX

14/11/04 15:37:30 INFO CheckpointReader: Attempting to load checkpoint from
file s3n://XXX

14/11/04 15:37:30 INFO NativeS3FileSystem: Opening 's3n://XXX

14/11/04 15:37:31 INFO Checkpoint: Checkpoint for time 1415114220000 ms
validated

14/11/04 15:37:31 INFO CheckpointReader: Checkpoint successfully loaded
from file s3n://XXX

14/11/04 15:37:31 INFO CheckpointReader: Checkpoint was generated at time
1415114220000 ms

14/11/04 15:37:33 INFO DStreamGraph: Restoring checkpoint data

14/11/04 15:37:33 INFO ForEachDStream: Restoring checkpoint data

14/11/04 15:37:33 INFO StateDStream: Restoring checkpoint data

14/11/04 15:37:33 INFO DStreamCheckpointData: Restoring checkpointed RDD
for time 1415097420000 ms from file 's3n://XXX

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access
Key ID and Secret Access Key must be specified as

the username or password (respectively) of a s3n URL, or by setting the
fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties
(respectively).

at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)

at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)

at org.apache.hadoop.fs.s3native.$Proxy8.initialize(Unknown Source)

at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

at org.apache.spark.rdd.CheckpointRDD.<init>(CheckpointRDD.scala:42)

at org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:824)

at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112)

at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)

at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

at
org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109)

at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:397)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:398)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:398)

at scala.collection.immutable.List.foreach(List.scala:318)

at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:398)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:149)

at
org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:131)

at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:552)

at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:552)

at scala.Option.map(Option.scala:145)

at
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552)

Reply via email to