[ 
https://issues.apache.org/jira/browse/SPARK-5250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963110#comment-14963110
 ] 

Mojmir Vinkler commented on SPARK-5250:
---------------------------------------

Yes, it's caused by reading a corrupt file (we only experienced this for 
compressed (gzipped) files). I think the file got corrupted when it was saved 
to S3, but we used boto for that, not Spark. What's weird is that I'm able to 
read the file with pandas without any problems.

> EOFException in when reading gzipped files from S3 with wholeTextFiles
> ----------------------------------------------------------------------
>
>                 Key: SPARK-5250
>                 URL: https://issues.apache.org/jira/browse/SPARK-5250
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>    Affects Versions: 1.2.0
>            Reporter: Mojmir Vinkler
>            Priority: Critical
>
> I get an `EOFException` error when reading *some* gzipped files using 
> `sc.wholeTextFiles`. It happens to just a few files, I thought that the file 
> is corrupted, but I was able to read it without problems using `sc.textFile` 
> (and pandas). 
> Traceback for command 
> `sc.wholeTextFiles('s3n://s3bucket/2525322021051.csv.gz').collect()`
> {code}
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-104-943aab11de03> in <module>()
> ----> 1 sc.wholeTextFiles('s3n://s3bucket/2525322021051.csv.gz').collect()
> /home/ubuntu/databricks/spark/python/pyspark/rdd.py in collect(self)
>     674         """
>     675         with SCCallSiteSync(self.context) as css:
> --> 676             bytesInJava = self._jrdd.collect().iterator()
>     677         return list(self._collect_iterator_through_file(bytesInJava))
>     678 
> /home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)
>     536         answer = self.gateway_client.send_command(command)
>     537         return_value = get_return_value(answer, self.gateway_client,
> --> 538                 self.target_id, self.name)
>     539 
>     540         for temp_arg in temp_args:
> /home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
> Py4JJavaError: An error occurred while calling o1576.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
> 41.0 (TID 4720, ip-10-0-241-126.ec2.internal): java.io.EOFException: 
> Unexpected end of input stream
>       at 
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:137)
>       at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:77)
>       at java.io.InputStream.read(InputStream.java:101)
>       at com.google.common.io.ByteStreams.copy(ByteStreams.java:207)
>       at com.google.common.io.ByteStreams.toByteArray(ByteStreams.java:252)
>       at 
> org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:73)
>       at 
> org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
>       at 
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>       at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>       at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>       at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>       at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>       at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to