Just wondering, do you get this particular exception if you are not
consolidating shuffle data?

On Wed, Jun 18, 2014 at 12:15 PM, Mridul Muralidharan <mri...@gmail.com> wrote:
> On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman
> <suren.hira...@velos.io> wrote:
>> Patrick,
>>
>> My team is using shuffle consolidation but not speculation. We are also
>> using persist(DISK_ONLY) for caching.
>
>
> Use of shuffle consolidation is probably what is causing the issue.
> Would be good idea to try again with that turned off (which is the default).
>
> It should get fixed most likely in 1.1 timeframe.
>
>
> Regards,
> Mridul
>
>
>>
>> Here are some config changes that are in our work-in-progress.
>>
>> We've been trying for 2 weeks to get our production flow (maybe around
>> 50-70 stages, a few forks and joins with up to 20 branches in the forks) to
>> run end to end without any success, running into other problems besides
>> this one as well. For example, we have run into situations where saving to
>> HDFS just hangs on a couple of tasks, which are printing out nothing in
>> their logs and not taking any CPU. For testing, our input data is 10 GB
>> across 320 input splits and generates maybe around 200-300 GB of
>> intermediate and final data.
>>
>>
>>         conf.set("spark.executor.memory", "14g")     // TODO make this
>> configurable
>>
>>         // shuffle configs
>>         conf.set("spark.default.parallelism", "320") // TODO make this
>> configurable
>>         conf.set("spark.shuffle.consolidateFiles","true")
>>
>>         conf.set("spark.shuffle.file.buffer.kb", "200")
>>         conf.set("spark.reducer.maxMbInFlight", "96")
>>
>>         conf.set("spark.rdd.compress","true"
>>
>>         // we ran into a problem with the default timeout of 60 seconds
>>         // this is also being set in the master's spark-env.sh. Not sure if
>> it needs to be in both places
>>         conf.set("spark.worker.timeout","180")
>>
>>         // akka settings
>>         conf.set("spark.akka.threads", "300")
>>         conf.set("spark.akka.timeout", "180")
>>         conf.set("spark.akka.frameSize", "100")
>>         conf.set("spark.akka.batchSize", "30")
>>         conf.set("spark.akka.askTimeout", "30")
>>
>>         // block manager
>>         conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
>>         conf.set("spark.blockManagerHeartBeatMs", "80000")
>>
>> -Suren
>>
>>
>>
>> On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pwend...@gmail.com> wrote:
>>
>>> Out of curiosity - are you guys using speculation, shuffle
>>> consolidation, or any other non-default option? If so that would help
>>> narrow down what's causing this corruption.
>>>
>>> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
>>> <suren.hira...@velos.io> wrote:
>>> > Matt/Ryan,
>>> >
>>> > Did you make any headway on this? My team is running into this also.
>>> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we
>>> > generate 100s of GBs in the flow itself.
>>> >
>>> > -Suren
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <compton.r...@gmail.com>
>>> wrote:
>>> >
>>> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
>>> >> cluster (no modifications to Spark or its dependencies). The error
>>> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
>>> >> edge list (GraphX worked beautifully on smaller data).
>>> >>
>>> >> Here's the stacktrace (it's quite similar to yours
>>> >> https://imgur.com/7iBA4nJ ).
>>> >>
>>> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
>>> >> 4 times; aborting job
>>> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
>>> >> VertexRDD.scala:100
>>> >> Exception in thread "main" org.apache.spark.SparkException: Job
>>> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
>>> >> recent failure: Exception failure in TID 29735 on host node18:
>>> >> java.io.StreamCorruptedException: invalid type code: AC
>>> >>
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
>>> >>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>>> >>
>>> >>
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>>> >>
>>> >>
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>>> >>
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> >>
>>> >>
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>>> >>
>>> >>
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>
>>> >>
>>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>>> >>
>>> >>
>>> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>>> >>
>>> >>
>>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>>> >>
>>> >>
>>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>>> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>
>>> >>
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>>> >>
>>> >>
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>>> >>
>>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>> >>         java.lang.Thread.run(Thread.java:662)
>>> >> Driver stacktrace:
>>> >> at org.apache.spark.scheduler.DAGScheduler.org
>>> >>
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>> >> at
>>> >>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>>> >> at scala.Option.foreach(Option.scala:236)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>> >> at
>>> >>
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> >> at
>>> >>
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> >> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> >> at
>>> >>
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
>>> >>
>>> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
>>> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mki...@oculusinfo.com>
>>> >> wrote:
>>> >> >> Im trying run some spark code on a cluster but I keep running into a
>>> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My
>>> task
>>> >> >> involves analyzing ~50GB of data (some operations involve sorting)
>>> then
>>> >> >> writing them out to a JSON file. Im running the analysis on each of
>>> the
>>> >> >> data's ~10 columns and have never had a successful run. My program
>>> >> seems to
>>> >> >> run for a varying amount of time each time (~between 5-30 minutes)
>>> but
>>> >> it
>>> >> >> always terminates with this error.
>>> >> >
>>> >> > I can tell you that this usually means somewhere something wrote
>>> >> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
>>> >> > is a header value.
>>> >> >
>>> >> > I don't obviously see where/how that could happen, but maybe it rings
>>> >> > a bell for someone. This could happen if an OutputStream is reused
>>> >> > across object serializations but new ObjectOutputStreams are opened,
>>> >> > for example.
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> >
>>> > SUREN HIRAMAN, VP TECHNOLOGY
>>> > Velos
>>> > Accelerating Machine Learning
>>> >
>>> > 440 NINTH AVENUE, 11TH FLOOR
>>> > NEW YORK, NY 10001
>>> > O: (917) 525-2466 ext. 105
>>> > F: 646.349.4063
>>> > E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io
>>> > W: www.velos.io
>>>
>>
>>
>>
>> --
>>
>> SUREN HIRAMAN, VP TECHNOLOGY
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR
>> NEW YORK, NY 10001
>> O: (917) 525-2466 ext. 105
>> F: 646.349.4063
>> E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io
>> W: www.velos.io

Reply via email to