Following Gerard's thoughts, here are possible things that could be happening.

1. Is there another process in the background that is deleting files
in the directory where you are trying to write? Seems like the
temporary file generated by one of the tasks is getting delete before
it is renamed to the final output file. I suggest trying to not write
to S3, rather just count and print (with rest of the computation
staying exactly same) and see if the error still occurs. That would
narrow down the culprit to what Gerard suggested.
2. Do you have speculative execution turned on? If so, could you turn
it off and try?

TD

On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas <gerard.m...@gmail.com> wrote:
> If the timestamps in the logs are to be trusted It looks like your driver is
> dying with that java.io.FileNotFoundException: and therefore the workers
> loose their connection and close down.
>
> -kr, Gerard.
>
> On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>>
>> Try to add the following to the sparkConf
>>
>>  .set("spark.core.connection.ack.wait.timeout","6000")
>>
>>       .set("spark.akka.frameSize","60")
>>
>> Used to face that issue with spark 1.1.0
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
>> <bar...@chaordicsystems.com> wrote:
>>>
>>> Dear Spark'ers,
>>>
>>> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
>>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
>>> job does the following:
>>> - Consumes a specific Kafka topic
>>> - Writes its content to S3 or HDFS
>>>
>>> Records in Kafka are in the form:
>>> {"key": "someString"}
>>>
>>> This is important because I use the value of "key" to define the output
>>> file name in S3.
>>> Here are the Spark and Kafka parameters I'm using:
>>>
>>>> val sparkConf = new SparkConf()
>>>>   .setAppName("MyDumperApp")
>>>>   .set("spark.task.maxFailures", "100")
>>>>   .set("spark.hadoop.validateOutputSpecs", "false")
>>>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>>>> val kafkaParams = Map(
>>>>   "zookeeper.connect" -> zkQuorum,
>>>>   "zookeeper.session.timeout.ms" -> "10000",
>>>>   "rebalance.backoff.ms" -> "8000",
>>>>   "rebalance.max.retries" -> "10",
>>>>   "group.id" -> group,
>>>>   "auto.offset.reset" -> "largest"
>>>> )
>>>
>>>
>>> My application is the following:
>>>
>>>> KafkaUtils.createStream[String, String, StringDecoder,
>>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>>>> StorageLevel.MEMORY_AND_DISK_SER_2)
>>>>   .foreachRDD((rdd, time) =>
>>>>     rdd.map {
>>>>       case (_, line) =>
>>>>         val json = parse(line)
>>>>         val key = extract(json, "key").getOrElse("key_not_found")
>>>>         (key, dateFormatter.format(time.milliseconds)) -> line
>>>>     }
>>>>       .partitionBy(new HashPartitioner(10))
>>>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
>>>> String]]("s3://BUCKET", classOf[BZip2Codec])
>>>>   )
>>>
>>>
>>> And the last piece:
>>>
>>>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>>>> MultipleTextOutputFormat[T , V] {
>>>>   override protected def generateFileNameForKeyValue(key: T, value: V,
>>>> leaf: String) = key match {
>>>>     case (myKey, batchId) =>
>>>>       "somedir" + "/" + myKey + "/" +
>>>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>>>>   }
>>>>   override protected def generateActualKey(key: T, value: V) = null
>>>> }
>>>
>>>
>>> I use batch sizes of 5 minutes with checkpoints activated.
>>> The job fails nondeterministically (I think it never ran longer than ~5
>>> hours). I have no clue why, it simply fails.
>>> Please find below the exceptions thrown by my application.
>>>
>>> I really appreciate any kind of hint.
>>> Thank you very much in advance.
>>>
>>> Regards,
>>> -- Flávio
>>>
>>> ==== Executor 1
>>>
>>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
>>> curMem=194463488,
>>>  maxMem=4445479895
>>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes in
>>> memor
>>> y (estimated size 96.4 KB, free 4.0 GB)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnecti
>>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnecti
>>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>>> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>>> sun.nio.ch.Se
>>> lectionKeyImpl@da2e041
>>> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>>> cancelled ? sun.n
>>> io.ch.SelectionKeyImpl@da2e041
>>> java.nio.channels.CancelledKeyException
>>>         at
>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
>>>         at
>>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>>> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>>> sun.nio.ch.SelectionKeyImpl@6a0dd98a
>>> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
>>> java.nio.channels.CancelledKeyException
>>>         at
>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>>>         at
>>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
>>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
>>> RECEIVED SIGNAL 15: SIGTERM
>>>
>>> ==== Executor 2
>>>
>>> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
>>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
>>> block input
>>> -0-1418238314800
>>> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>>> sun.nio.ch.Se
>>> lectionKeyImpl@66ea19c
>>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnecti
>>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
>>> SendingConn
>>> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> not found
>>> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>>> cancelled ? sun.n
>>> io.ch.SelectionKeyImpl@66ea19c
>>> java.nio.channels.CancelledKeyException
>>>         at
>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>>>         at
>>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>>>
>>> ==== Driver
>>>
>>> 2014-12-10 19:05:13,805 INFO
>>> [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
>>> (Logging.scala:logInfo(59)) - Added input
>>> -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444
>>> (size: 38.2 KB, free: 4.1 GB)
>>> 2014-12-10 19:05:13,823 ERROR
>>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
>>> (Logging.scala:logError(96)) - Error runnin
>>> g job streaming job 1418238300000 ms.0
>>> java.io.FileNotFoundException: File
>>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist.
>>>         at
>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
>>>         at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>>>         at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>>>         at
>>> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>>>         at
>>> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>>>         at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>>>         at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
>>>         at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
>>>         at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
>>>         at
>>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
>>>         at
>>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>         at scala.util.Try$.apply(Try.scala:161)
>>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
>>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED
>>>
>>> --
>>> Flávio R. Santos
>>>
>>> Chaordic | Platform
>>> www.chaordic.com.br
>>> +55 48 3232.3200
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to