Just wondering, do you get this particular exception if you are not consolidating shuffle data?
On Wed, Jun 18, 2014 at 12:15 PM, Mridul Muralidharan <mri...@gmail.com> wrote: > On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman > <suren.hira...@velos.io> wrote: >> Patrick, >> >> My team is using shuffle consolidation but not speculation. We are also >> using persist(DISK_ONLY) for caching. > > > Use of shuffle consolidation is probably what is causing the issue. > Would be good idea to try again with that turned off (which is the default). > > It should get fixed most likely in 1.1 timeframe. > > > Regards, > Mridul > > >> >> Here are some config changes that are in our work-in-progress. >> >> We've been trying for 2 weeks to get our production flow (maybe around >> 50-70 stages, a few forks and joins with up to 20 branches in the forks) to >> run end to end without any success, running into other problems besides >> this one as well. For example, we have run into situations where saving to >> HDFS just hangs on a couple of tasks, which are printing out nothing in >> their logs and not taking any CPU. For testing, our input data is 10 GB >> across 320 input splits and generates maybe around 200-300 GB of >> intermediate and final data. >> >> >> conf.set("spark.executor.memory", "14g") // TODO make this >> configurable >> >> // shuffle configs >> conf.set("spark.default.parallelism", "320") // TODO make this >> configurable >> conf.set("spark.shuffle.consolidateFiles","true") >> >> conf.set("spark.shuffle.file.buffer.kb", "200") >> conf.set("spark.reducer.maxMbInFlight", "96") >> >> conf.set("spark.rdd.compress","true" >> >> // we ran into a problem with the default timeout of 60 seconds >> // this is also being set in the master's spark-env.sh. Not sure if >> it needs to be in both places >> conf.set("spark.worker.timeout","180") >> >> // akka settings >> conf.set("spark.akka.threads", "300") >> conf.set("spark.akka.timeout", "180") >> conf.set("spark.akka.frameSize", "100") >> conf.set("spark.akka.batchSize", "30") >> conf.set("spark.akka.askTimeout", "30") >> >> // block manager >> conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000") >> conf.set("spark.blockManagerHeartBeatMs", "80000") >> >> -Suren >> >> >> >> On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pwend...@gmail.com> wrote: >> >>> Out of curiosity - are you guys using speculation, shuffle >>> consolidation, or any other non-default option? If so that would help >>> narrow down what's causing this corruption. >>> >>> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman >>> <suren.hira...@velos.io> wrote: >>> > Matt/Ryan, >>> > >>> > Did you make any headway on this? My team is running into this also. >>> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we >>> > generate 100s of GBs in the flow itself. >>> > >>> > -Suren >>> > >>> > >>> > >>> > >>> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <compton.r...@gmail.com> >>> wrote: >>> > >>> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3 >>> >> cluster (no modifications to Spark or its dependencies). The error >>> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB >>> >> edge list (GraphX worked beautifully on smaller data). >>> >> >>> >> Here's the stacktrace (it's quite similar to yours >>> >> https://imgur.com/7iBA4nJ ). >>> >> >>> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed >>> >> 4 times; aborting job >>> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at >>> >> VertexRDD.scala:100 >>> >> Exception in thread "main" org.apache.spark.SparkException: Job >>> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most >>> >> recent failure: Exception failure in TID 29735 on host node18: >>> >> java.io.StreamCorruptedException: invalid type code: AC >>> >> >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355) >>> >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) >>> >> >>> >> >>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) >>> >> >>> >> >>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) >>> >> >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> >> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> >> >>> >> >>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) >>> >> >>> >> >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>> >> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> >> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> >> >>> >> >>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192) >>> >> >>> >> >>> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78) >>> >> >>> >> >>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75) >>> >> >>> >> >>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73) >>> >> scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> >> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> >> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> >> >>> >> >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) >>> >> >>> >> >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >>> >> org.apache.spark.scheduler.Task.run(Task.scala:51) >>> >> >>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) >>> >> >>> >> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >>> >> >>> >> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >>> >> java.lang.Thread.run(Thread.java:662) >>> >> Driver stacktrace: >>> >> at org.apache.spark.scheduler.DAGScheduler.org >>> >> >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) >>> >> at >>> >> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) >>> >> at scala.Option.foreach(Option.scala:236) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) >>> >> at >>> >> >>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) >>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> >> at >>> >> >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >> at >>> >> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> >> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >> at >>> >> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5 >>> >> >>> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote: >>> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mki...@oculusinfo.com> >>> >> wrote: >>> >> >> Im trying run some spark code on a cluster but I keep running into a >>> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My >>> task >>> >> >> involves analyzing ~50GB of data (some operations involve sorting) >>> then >>> >> >> writing them out to a JSON file. Im running the analysis on each of >>> the >>> >> >> data's ~10 columns and have never had a successful run. My program >>> >> seems to >>> >> >> run for a varying amount of time each time (~between 5-30 minutes) >>> but >>> >> it >>> >> >> always terminates with this error. >>> >> > >>> >> > I can tell you that this usually means somewhere something wrote >>> >> > objects to the same OutputStream with multiple ObjectOutputStreams. AC >>> >> > is a header value. >>> >> > >>> >> > I don't obviously see where/how that could happen, but maybe it rings >>> >> > a bell for someone. This could happen if an OutputStream is reused >>> >> > across object serializations but new ObjectOutputStreams are opened, >>> >> > for example. >>> >> >>> > >>> > >>> > >>> > -- >>> > >>> > SUREN HIRAMAN, VP TECHNOLOGY >>> > Velos >>> > Accelerating Machine Learning >>> > >>> > 440 NINTH AVENUE, 11TH FLOOR >>> > NEW YORK, NY 10001 >>> > O: (917) 525-2466 ext. 105 >>> > F: 646.349.4063 >>> > E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io >>> > W: www.velos.io >>> >> >> >> >> -- >> >> SUREN HIRAMAN, VP TECHNOLOGY >> Velos >> Accelerating Machine Learning >> >> 440 NINTH AVENUE, 11TH FLOOR >> NEW YORK, NY 10001 >> O: (917) 525-2466 ext. 105 >> F: 646.349.4063 >> E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io >> W: www.velos.io